ROOTPLOIT
Server: LiteSpeed
System: Linux in-mum-web1878.main-hosting.eu 5.14.0-570.21.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jun 11 07:22:35 EDT 2025 x86_64
User: u435929562 (435929562)
PHP: 7.4.33
Disabled: system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
Upload Files
File: //opt/go/pkg/mod/go.mongodb.org/[email protected]/mongo/integration/sdam_error_handling_test.go
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

//go:build go1.13
// +build go1.13

package integration

import (
	"context"
	"errors"
	"fmt"
	"testing"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/internal/assert"
	"go.mongodb.org/mongo-driver/internal/eventtest"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
	"go.mongodb.org/mongo-driver/mongo/options"
)

func TestSDAMErrorHandling(t *testing.T) {
	mt := mtest.New(t, noClientOpts)
	baseClientOpts := func() *options.ClientOptions {
		return options.Client().
			ApplyURI(mtest.ClusterURI()).
			SetRetryWrites(false).
			SetWriteConcern(mtest.MajorityWc)
	}
	baseMtOpts := func() *mtest.Options {
		mtOpts := mtest.NewOptions().
			Topologies(mtest.ReplicaSet, mtest.Single). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
			MinServerVersion("4.0").                    // 4.0+ is required to use failpoints on replica sets.
			ClientOptions(baseClientOpts())

		if mtest.ClusterTopologyKind() == mtest.Sharded {
			// Pin to a single mongos because the tests use failpoints.
			mtOpts.ClientType(mtest.Pinned)
		}
		return mtOpts
	}

	// Set min server version of 4.4 because the during-handshake tests use failpoint features introduced in 4.4 like
	// blockConnection and appName.
	mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
		mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
			mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
				// Assert that the pool is cleared when a connection created by an application
				// operation thread encounters a timeout caused by socketTimeoutMS during
				// handshaking.

				appName := "authConnectTimeoutTest"
				// Set failpoint on saslContinue instead of saslStart because saslStart isn't done when using
				// speculative auth.
				mt.SetFailPoint(mtest.FailPoint{
					ConfigureFailPoint: "failCommand",
					Mode: mtest.FailPointMode{
						Times: 1,
					},
					Data: mtest.FailPointData{
						FailCommands:    []string{"saslContinue"},
						BlockConnection: true,
						BlockTimeMS:     150,
						AppName:         appName,
					},
				})

				// Reset the client with the appName specified in the failpoint and the pool monitor.
				tpm := eventtest.NewTestPoolMonitor()
				mt.ResetClient(baseClientOpts().
					SetAppName(appName).
					SetPoolMonitor(tpm.PoolMonitor).
					// Set a 100ms socket timeout so that the saslContinue delay of 150ms causes a
					// timeout during socket read (i.e. a timeout not caused by the InsertOne context).
					SetSocketTimeout(100 * time.Millisecond))

				// Use context.Background() so that the new connection will not time out due to an
				// operation-scoped timeout.
				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
				assert.NotNil(mt, err, "expected InsertOne error, got nil")
				assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
				assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
				// Assert that the pool is cleared within 2 seconds.
				assert.Soon(mt, func(ctx context.Context) {
					ticker := time.NewTicker(100 * time.Millisecond)
					defer ticker.Stop()

					for {
						select {
						case <-ticker.C:
						case <-ctx.Done():
							return
						}

						if tpm.IsPoolCleared() {
							return
						}
					}
				}, 2*time.Second)
			})

			mt.RunOpts("pool cleared on non-timeout network error", noClientOpts, func(mt *mtest.T) {
				mt.Run("background", func(mt *mtest.T) {
					// Assert that the pool is cleared when a connection created by the background pool maintenance
					// routine encounters a non-timeout network error during handshaking.
					appName := "authNetworkErrorTestBackground"

					mt.SetFailPoint(mtest.FailPoint{
						ConfigureFailPoint: "failCommand",
						Mode: mtest.FailPointMode{
							Times: 1,
						},
						Data: mtest.FailPointData{
							FailCommands:    []string{"saslContinue"},
							CloseConnection: true,
							AppName:         appName,
						},
					})

					// Reset the client with the appName specified in the failpoint.
					tpm := eventtest.NewTestPoolMonitor()
					mt.ResetClient(baseClientOpts().
						SetAppName(appName).
						SetPoolMonitor(tpm.PoolMonitor).
						// Set minPoolSize to enable the background pool maintenance goroutine.
						SetMinPoolSize(5))

					// Assert that the pool is cleared within 2 seconds.
					assert.Soon(mt, func(ctx context.Context) {
						ticker := time.NewTicker(100 * time.Millisecond)
						defer ticker.Stop()

						for {
							select {
							case <-ticker.C:
							case <-ctx.Done():
								return
							}

							if tpm.IsPoolCleared() {
								return
							}
						}
					}, 2*time.Second)
				})

				mt.Run("foreground", func(mt *mtest.T) {
					// Assert that the pool is cleared when a connection created by an application thread connection
					// checkout encounters a non-timeout network error during handshaking.
					appName := "authNetworkErrorTestForeground"

					mt.SetFailPoint(mtest.FailPoint{
						ConfigureFailPoint: "failCommand",
						Mode: mtest.FailPointMode{
							Times: 1,
						},
						Data: mtest.FailPointData{
							FailCommands:    []string{"saslContinue"},
							CloseConnection: true,
							AppName:         appName,
						},
					})

					// Reset the client with the appName specified in the failpoint.
					tpm := eventtest.NewTestPoolMonitor()
					mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

					_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
					assert.NotNil(mt, err, "expected InsertOne error, got nil")
					assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)

					// Assert that the pool is cleared within 2 seconds.
					assert.Soon(mt, func(ctx context.Context) {
						ticker := time.NewTicker(100 * time.Millisecond)
						defer ticker.Stop()

						for {
							select {
							case <-ticker.C:
							case <-ctx.Done():
								return
							}

							if tpm.IsPoolCleared() {
								return
							}
						}
					}, 2*time.Second)
				})
			})
		})
	})
	mt.RunOpts("after handshake completes", baseMtOpts(), func(mt *mtest.T) {
		mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
			mt.Run("pool cleared on non-timeout network error", func(mt *mtest.T) {
				appName := "afterHandshakeNetworkError"

				mt.SetFailPoint(mtest.FailPoint{
					ConfigureFailPoint: "failCommand",
					Mode: mtest.FailPointMode{
						Times: 1,
					},
					Data: mtest.FailPointData{
						FailCommands:    []string{"insert"},
						CloseConnection: true,
						AppName:         appName,
					},
				})

				// Reset the client with the appName specified in the failpoint.
				tpm := eventtest.NewTestPoolMonitor()
				mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
				assert.NotNil(mt, err, "expected InsertOne error, got nil")
				assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
				assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
			})
			mt.Run("pool not cleared on timeout network error", func(mt *mtest.T) {
				tpm := eventtest.NewTestPoolMonitor()
				mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))

				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
				assert.Nil(mt, err, "InsertOne error: %v", err)

				filter := bson.M{
					"$where": "function() { sleep(1000); return false; }",
				}
				timeoutCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
				defer cancel()
				_, err = mt.Coll.Find(timeoutCtx, filter)
				assert.NotNil(mt, err, "expected Find error, got %v", err)
				assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
				assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
			})
			mt.Run("pool not cleared on context cancellation", func(mt *mtest.T) {
				tpm := eventtest.NewTestPoolMonitor()
				mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))

				_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
				assert.Nil(mt, err, "InsertOne error: %v", err)

				findCtx, cancel := context.WithCancel(context.Background())
				go func() {
					time.Sleep(100 * time.Millisecond)
					cancel()
				}()

				filter := bson.M{
					"$where": "function() { sleep(1000); return false; }",
				}
				_, err = mt.Coll.Find(findCtx, filter)
				assert.NotNil(mt, err, "expected Find error, got nil")
				assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
				assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
				assert.True(mt, errors.Is(err, context.Canceled), "expected error %v to be context.Canceled", err)
				assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
			})
		})
		mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
			// Integration tests for the SDAM error handling code path for errors in server response documents. These
			// errors can be part of the top-level document in ok:0 responses or in a nested writeConcernError document.

			// On 4.4, some state change errors include a topologyVersion field. Because we're triggering these errors
			// via failCommand, the topologyVersion does not actually change as it would in an actual state change.
			// This causes the SDAM error handling code path to think we've already handled this state change and
			// ignore the error because it's stale. To avoid this altogether, we cap the test to <= 4.2.
			serverErrorsMtOpts := baseMtOpts().
				MinServerVersion("4.0"). // failCommand support
				MaxServerVersion("4.2").
				ClientOptions(baseClientOpts().SetRetryWrites(false))

			testCases := []struct {
				name      string
				errorCode int32

				// For shutdown errors, the pool is always cleared. For non-shutdown errors, the pool is only cleared
				// for pre-4.2 servers.
				isShutdownError bool
			}{
				// "node is recovering" errors
				{"InterruptedAtShutdown", 11600, true},
				{"InterruptedDueToReplStateChange, not shutdown", 11602, false},
				{"NotPrimaryOrSecondary", 13436, false},
				{"PrimarySteppedDown", 189, false},
				{"ShutdownInProgress", 91, true},

				// "not primary" errors
				{"NotPrimary", 10107, false},
				{"NotPrimaryNoSecondaryOk", 13435, false},
			}
			for _, tc := range testCases {
				mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
					appName := fmt.Sprintf("command_error_%s", tc.name)

					// Cause the next insert to fail with an ok:0 response.
					mt.SetFailPoint(mtest.FailPoint{
						ConfigureFailPoint: "failCommand",
						Mode: mtest.FailPointMode{
							Times: 1,
						},
						Data: mtest.FailPointData{
							FailCommands: []string{"insert"},
							ErrorCode:    tc.errorCode,
							AppName:      appName,
						},
					})

					// Reset the client with the appName specified in the failpoint.
					tpm := eventtest.NewTestPoolMonitor()
					mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

					runServerErrorsTest(mt, tc.isShutdownError, tpm)
				})
				mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
					appName := fmt.Sprintf("write_concern_error_%s", tc.name)

					// Cause the next insert to fail with a write concern error.
					mt.SetFailPoint(mtest.FailPoint{
						ConfigureFailPoint: "failCommand",
						Mode: mtest.FailPointMode{
							Times: 1,
						},
						Data: mtest.FailPointData{
							FailCommands: []string{"insert"},
							WriteConcernError: &mtest.WriteConcernErrorData{
								Code: tc.errorCode,
							},
							AppName: appName,
						},
					})

					// Reset the client with the appName specified in the failpoint.
					tpm := eventtest.NewTestPoolMonitor()
					mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))

					runServerErrorsTest(mt, tc.isShutdownError, tpm)
				})
			}
		})
	})
}

func runServerErrorsTest(mt *mtest.T, isShutdownError bool, tpm *eventtest.TestPoolMonitor) {
	mt.Helper()

	_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
	assert.NotNil(mt, err, "expected InsertOne error, got nil")

	// The pool should always be cleared for shutdown errors, regardless of server version.
	if isShutdownError {
		assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared, but was not")
		return
	}

	// For non-shutdown errors, the pool is only cleared if the error is from a pre-4.2 server.
	wantCleared := mtest.CompareServerVersions(mtest.ServerVersion(), "4.2") < 0
	gotCleared := tpm.IsPoolCleared()
	assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %t; pool was cleared: %t",
		wantCleared, gotCleared)
}