Skip to content

Commit 03a4a86

Browse files
tianzhouclaude
andauthored
fix: use dedicated connection in ApplySchema to preserve search_path (#375)
ApplySchema used separate ExecContext calls on *sql.DB (connection pool) for SET search_path and SQL execution. Go's database/sql does not guarantee the same connection across calls, so session-scoped settings like search_path could be lost, causing objects to be created in the wrong schema. Fix: acquire a single *sql.Conn from the pool and use it for all statements, guaranteeing SET search_path affects subsequent SQL. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0e9d685 commit 03a4a86

3 files changed

Lines changed: 35 additions & 8 deletions

File tree

cmd/util/sql_logger.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,16 @@ import (
77
"github.com/pgplex/pgschema/internal/logger"
88
)
99

10+
// execer is an interface satisfied by both *sql.DB and *sql.Conn,
11+
// allowing ExecContextWithLogging to work with either.
12+
type execer interface {
13+
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
14+
}
15+
1016
// ExecContextWithLogging executes SQL with debug logging if debug mode is enabled.
1117
// It logs the SQL statement before execution and the result/error after execution.
12-
func ExecContextWithLogging(ctx context.Context, db *sql.DB, sqlStmt string, description string) (sql.Result, error) {
18+
// It accepts both *sql.DB and *sql.Conn via the execer interface.
19+
func ExecContextWithLogging(ctx context.Context, db execer, sqlStmt string, description string) (sql.Result, error) {
1320
isDebug := logger.IsDebug()
1421
if isDebug {
1522
logger.Get().Debug("Executing SQL", "description", description, "sql", sqlStmt)

internal/postgres/embedded.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,22 +191,32 @@ func (ep *EmbeddedPostgres) GetSchemaName() string {
191191
// This ensures a clean state before applying the desired schema definition.
192192
// Note: The schema parameter is ignored - we always use the temporary schema name.
193193
func (ep *EmbeddedPostgres) ApplySchema(ctx context.Context, schema string, sql string) error {
194+
// Acquire a single dedicated connection to ensure SET search_path affects
195+
// all subsequent statements. Using *sql.DB (connection pool) does not
196+
// guarantee the same connection across ExecContext calls, so session-scoped
197+
// settings like search_path may be lost.
198+
conn, err := ep.db.Conn(ctx)
199+
if err != nil {
200+
return fmt.Errorf("failed to acquire connection: %w", err)
201+
}
202+
defer conn.Close()
203+
194204
// Drop the temporary schema if it exists (CASCADE to drop all objects)
195205
dropSchemaSQL := fmt.Sprintf("DROP SCHEMA IF EXISTS \"%s\" CASCADE", ep.tempSchema)
196-
if _, err := util.ExecContextWithLogging(ctx, ep.db, dropSchemaSQL, "drop temporary schema"); err != nil {
206+
if _, err := util.ExecContextWithLogging(ctx, conn, dropSchemaSQL, "drop temporary schema"); err != nil {
197207
return fmt.Errorf("failed to drop temporary schema %s: %w", ep.tempSchema, err)
198208
}
199209

200210
// Create the temporary schema
201211
createSchemaSQL := fmt.Sprintf("CREATE SCHEMA \"%s\"", ep.tempSchema)
202-
if _, err := util.ExecContextWithLogging(ctx, ep.db, createSchemaSQL, "create temporary schema"); err != nil {
212+
if _, err := util.ExecContextWithLogging(ctx, conn, createSchemaSQL, "create temporary schema"); err != nil {
203213
return fmt.Errorf("failed to create temporary schema %s: %w", ep.tempSchema, err)
204214
}
205215

206216
// Set search_path to the temporary schema, with public as fallback
207217
// for resolving extension types installed in public schema (issue #197)
208218
setSearchPathSQL := fmt.Sprintf("SET search_path TO \"%s\", public", ep.tempSchema)
209-
if _, err := util.ExecContextWithLogging(ctx, ep.db, setSearchPathSQL, "set search_path for desired state"); err != nil {
219+
if _, err := util.ExecContextWithLogging(ctx, conn, setSearchPathSQL, "set search_path for desired state"); err != nil {
210220
return fmt.Errorf("failed to set search_path: %w", err)
211221
}
212222

@@ -227,7 +237,7 @@ func (ep *EmbeddedPostgres) ApplySchema(ctx context.Context, schema string, sql
227237
// Execute the SQL directly
228238
// Note: Desired state SQL should never contain operations like CREATE INDEX CONCURRENTLY
229239
// that cannot run in transactions. Those are migration details, not state declarations.
230-
if _, err := util.ExecContextWithLogging(ctx, ep.db, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
240+
if _, err := util.ExecContextWithLogging(ctx, conn, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
231241
return fmt.Errorf("failed to apply schema SQL to temporary schema %s: %w", ep.tempSchema, err)
232242
}
233243

internal/postgres/external.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,26 @@ func (ed *ExternalDatabase) ApplySchema(ctx context.Context, schema string, sql
108108
// Note: We use the temporary schema name (ed.tempSchema) instead of the user-provided schema name
109109
// This ensures we don't interfere with existing schemas in the external database
110110

111+
// Acquire a single dedicated connection to ensure SET search_path affects
112+
// all subsequent statements. Using *sql.DB (connection pool) does not
113+
// guarantee the same connection across ExecContext calls, so session-scoped
114+
// settings like search_path may be lost.
115+
conn, err := ed.db.Conn(ctx)
116+
if err != nil {
117+
return fmt.Errorf("failed to acquire connection: %w", err)
118+
}
119+
defer conn.Close()
120+
111121
// Create the temporary schema
112122
createSchemaSQL := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS \"%s\"", ed.tempSchema)
113-
if _, err := util.ExecContextWithLogging(ctx, ed.db, createSchemaSQL, "create temporary schema"); err != nil {
123+
if _, err := util.ExecContextWithLogging(ctx, conn, createSchemaSQL, "create temporary schema"); err != nil {
114124
return fmt.Errorf("failed to create temporary schema %s: %w", ed.tempSchema, err)
115125
}
116126

117127
// Set search_path to the temporary schema, with public as fallback
118128
// for resolving extension types installed in public schema (issue #197)
119129
setSearchPathSQL := fmt.Sprintf("SET search_path TO \"%s\", public", ed.tempSchema)
120-
if _, err := util.ExecContextWithLogging(ctx, ed.db, setSearchPathSQL, "set search_path for desired state"); err != nil {
130+
if _, err := util.ExecContextWithLogging(ctx, conn, setSearchPathSQL, "set search_path for desired state"); err != nil {
121131
return fmt.Errorf("failed to set search_path: %w", err)
122132
}
123133

@@ -138,7 +148,7 @@ func (ed *ExternalDatabase) ApplySchema(ctx context.Context, schema string, sql
138148
// Execute the SQL directly
139149
// Note: Desired state SQL should never contain operations like CREATE INDEX CONCURRENTLY
140150
// that cannot run in transactions. Those are migration details, not state declarations.
141-
if _, err := util.ExecContextWithLogging(ctx, ed.db, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
151+
if _, err := util.ExecContextWithLogging(ctx, conn, schemaAgnosticSQL, "apply desired state SQL to temporary schema"); err != nil {
142152
return fmt.Errorf("failed to apply schema SQL to temporary schema %s: %w", ed.tempSchema, err)
143153
}
144154

0 commit comments

Comments
 (0)