Skip to content

Commit 32d6d6f

Browse files
Merge pull request #24 from JasperFx/weasel-schema-migration
Migrate all DDL to Weasel SchemaMigration
2 parents c6ca2ee + 3b0787f commit 32d6d6f

10 files changed

Lines changed: 127 additions & 191 deletions

File tree

src/Polecat.Tests/Storage/document_foreign_key_tests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ await StoreOptions(opts =>
216216
{
217217
ConnectionString = theStore.Options.ConnectionString,
218218
AutoCreateSchemaObjects = JasperFx.AutoCreate.All,
219-
DatabaseSchemaName = "fk_idempotent"
219+
DatabaseSchemaName = "fk_idempotent",
220+
UseNativeJsonType = ConnectionSource.SupportsNativeJson
220221
};
221222
opts2.Schema.For<FkUser>();
222223
opts2.Schema.For<FkIssue>().ForeignKey<FkUser>(x => x.AssigneeId);

src/Polecat.Tests/Storage/document_index_tests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ await StoreOptions(opts =>
285285
{
286286
ConnectionString = theStore.Options.ConnectionString,
287287
AutoCreateSchemaObjects = JasperFx.AutoCreate.All,
288-
DatabaseSchemaName = "idx_idempotent"
288+
DatabaseSchemaName = "idx_idempotent",
289+
UseNativeJsonType = ConnectionSource.SupportsNativeJson
289290
};
290291
opts2.Schema.For<IndexedProduct>().Index(x => x.Sku);
291292
using var store2 = new DocumentStore(opts2);

src/Polecat/AdvancedOperations.cs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -283,35 +283,26 @@ await _resilience.ExecuteAsync(static async (state, ct) =>
283283
}
284284

285285
/// <summary>
286-
/// Generate the full DDL script for all Polecat schema objects (event store tables + any registered document tables).
286+
/// Generate the full DDL script for all Polecat schema objects (event store tables, document tables, HiLo).
287287
/// </summary>
288288
public string ToDatabaseScript()
289289
{
290290
var sb = new StringBuilder();
291291
var writer = new StringWriter(sb);
292+
var migrator = new Weasel.SqlServer.SqlServerMigrator();
292293

293-
// Event store tables via Weasel
294+
// All schema objects (event store + documents + hilo) via Weasel feature schemas
294295
foreach (var featureSchema in _store.Database.BuildFeatureSchemas())
295296
{
296297
foreach (var schemaObject in featureSchema.Objects)
297298
{
298-
schemaObject.WriteCreateStatement(new Weasel.SqlServer.SqlServerMigrator(), writer);
299+
schemaObject.WriteCreateStatement(migrator, writer);
299300
writer.WriteLine();
300301
writer.WriteLine("GO");
301302
writer.WriteLine();
302303
}
303304
}
304305

305-
// Document tables for already-registered providers
306-
foreach (var provider in _store.Options.Providers.AllProviders)
307-
{
308-
var table = new DocumentTable(provider.Mapping);
309-
table.WriteCreateStatement(new Weasel.SqlServer.SqlServerMigrator(), writer);
310-
writer.WriteLine();
311-
writer.WriteLine("GO");
312-
writer.WriteLine();
313-
}
314-
315306
return sb.ToString();
316307
}
317308

Lines changed: 17 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
using System.Collections.Concurrent;
2+
using JasperFx;
23
using Microsoft.Data.SqlClient;
3-
using Polecat.Metadata;
4+
using Polecat.Schema.Identity.Sequences;
45
using Polecat.Storage;
6+
using Weasel.Core;
7+
using Weasel.SqlServer;
58

69
namespace Polecat.Internal;
710

811
/// <summary>
9-
/// Ensures document tables exist on demand. Uses Weasel to create tables
10-
/// if they don't exist, and tracks which types have been ensured.
12+
/// Ensures document tables exist on demand. Uses Weasel SchemaMigration to create
13+
/// or update tables, and tracks which types have been ensured.
1114
/// </summary>
1215
internal class DocumentTableEnsurer
1316
{
@@ -49,35 +52,29 @@ public async Task EnsureTableAsync(DocumentProvider provider, CancellationToken
4952
return;
5053
}
5154

52-
var table = new DocumentTable(provider.Mapping);
53-
5455
await using var conn = _connectionFactory.Create();
5556
await conn.OpenAsync(token);
5657

58+
var migrator = new SqlServerMigrator();
59+
5760
// Ensure pc_hilo table for numeric ID types
5861
if (provider.Mapping.IsNumericId && !_hiloTableEnsured)
5962
{
60-
var hiloDdl = BuildHiloTableDdl(provider.Mapping.DatabaseSchemaName);
61-
await using var hiloCmd = conn.CreateCommand();
62-
hiloCmd.CommandText = hiloDdl;
63-
await hiloCmd.ExecuteNonQueryAsync(token);
63+
var hiloTable = new HiloTable(provider.Mapping.DatabaseSchemaName);
64+
var hiloMigration = await SchemaMigration.DetermineAsync(conn, token, hiloTable);
65+
await migrator.ApplyAllAsync(conn, hiloMigration, AutoCreate.CreateOrUpdate, ct: token);
6466
_hiloTableEnsured = true;
6567
}
6668

67-
// Use raw DDL with IF NOT EXISTS for safety
68-
var ddl = BuildCreateTableDdl(provider.Mapping);
69-
await using var cmd = conn.CreateCommand();
70-
cmd.CommandText = ddl;
71-
await cmd.ExecuteNonQueryAsync(token);
72-
73-
// Ensure created_at column exists (migration for tables created before this column was added)
74-
await using var migrateCmd = conn.CreateCommand();
75-
migrateCmd.CommandText = BuildAddMissingColumnsDdl(provider.Mapping);
76-
await migrateCmd.ExecuteNonQueryAsync(token);
69+
// Use Weasel SchemaMigration to create or update the document table
70+
var table = new DocumentTable(provider.Mapping);
71+
var migration = await SchemaMigration.DetermineAsync(conn, token, table);
72+
await migrator.ApplyAllAsync(conn, migration, AutoCreate.CreateOrUpdate, ct: token);
7773

7874
// Create custom indexes (computed columns + index)
75+
// Computed columns are not modeled in Weasel, so they remain as supplementary DDL.
7976
// Each statement executed separately so computed columns are visible
80-
// before filtered indexes reference them
77+
// before filtered indexes reference them.
8178
foreach (var index in provider.Mapping.Indexes)
8279
{
8380
foreach (var statement in index.ToDdlStatements(provider.Mapping))
@@ -147,106 +144,4 @@ public async Task EnsureTablesAsync(IEnumerable<DocumentProvider> providers, Can
147144
await EnsureTableAsync(provider, token);
148145
}
149146
}
150-
151-
private static string BuildHiloTableDdl(string schema)
152-
{
153-
return $"""
154-
IF NOT EXISTS (SELECT 1 FROM sys.tables t
155-
JOIN sys.schemas s ON t.schema_id = s.schema_id
156-
WHERE s.name = '{schema}' AND t.name = 'pc_hilo')
157-
BEGIN
158-
IF SCHEMA_ID('{schema}') IS NULL
159-
EXEC('CREATE SCHEMA [{schema}]');
160-
161-
CREATE TABLE [{schema}].[pc_hilo] (
162-
entity_name varchar(250) NOT NULL PRIMARY KEY,
163-
hi_value bigint NOT NULL DEFAULT 0
164-
);
165-
END
166-
""";
167-
}
168-
169-
private static string BuildCreateTableDdl(DocumentMapping mapping)
170-
{
171-
var schema = mapping.DatabaseSchemaName;
172-
var table = mapping.TableName;
173-
var innerIdType = mapping.InnerIdType;
174-
var idType = innerIdType == typeof(Guid) ? "uniqueidentifier"
175-
: innerIdType == typeof(int) ? "int"
176-
: innerIdType == typeof(long) ? "bigint"
177-
: "varchar(250)";
178-
var isConjoined = mapping.TenancyStyle == TenancyStyle.Conjoined;
179-
var isSoftDelete = mapping.DeleteStyle == DeleteStyle.SoftDelete;
180-
var softDeleteCols = isSoftDelete
181-
? @"
182-
is_deleted bit NOT NULL DEFAULT 0,
183-
deleted_at datetimeoffset NULL,"
184-
: "";
185-
var guidVersionCol = mapping.UseOptimisticConcurrency
186-
? @"
187-
guid_version uniqueidentifier NOT NULL DEFAULT NEWID(),"
188-
: "";
189-
var docTypeCol = mapping.IsHierarchy()
190-
? @"
191-
doc_type varchar(250) NOT NULL DEFAULT 'base',"
192-
: "";
193-
194-
if (isConjoined)
195-
{
196-
return $@"
197-
IF NOT EXISTS (SELECT 1 FROM sys.tables t
198-
JOIN sys.schemas s ON t.schema_id = s.schema_id
199-
WHERE s.name = '{schema}' AND t.name = '{table}')
200-
BEGIN
201-
IF SCHEMA_ID('{schema}') IS NULL
202-
EXEC('CREATE SCHEMA [{schema}]');
203-
204-
CREATE TABLE [{schema}].[{table}] (
205-
tenant_id varchar(250) NOT NULL,
206-
id {idType} NOT NULL,
207-
data nvarchar(max) NOT NULL,
208-
version int NOT NULL DEFAULT 1,
209-
last_modified datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET(),
210-
created_at datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET(),
211-
dotnet_type varchar(500) NULL,{docTypeCol}{softDeleteCols}{guidVersionCol}
212-
CONSTRAINT pk_{table} PRIMARY KEY (tenant_id, id)
213-
);
214-
END";
215-
}
216-
217-
return $@"
218-
IF NOT EXISTS (SELECT 1 FROM sys.tables t
219-
JOIN sys.schemas s ON t.schema_id = s.schema_id
220-
WHERE s.name = '{schema}' AND t.name = '{table}')
221-
BEGIN
222-
IF SCHEMA_ID('{schema}') IS NULL
223-
EXEC('CREATE SCHEMA [{schema}]');
224-
225-
CREATE TABLE [{schema}].[{table}] (
226-
id {idType} NOT NULL PRIMARY KEY,
227-
data nvarchar(max) NOT NULL,
228-
version int NOT NULL DEFAULT 1,
229-
last_modified datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET(),
230-
created_at datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET(),
231-
dotnet_type varchar(500) NULL,{docTypeCol}{softDeleteCols}{guidVersionCol}
232-
tenant_id varchar(250) NOT NULL DEFAULT '*DEFAULT*'
233-
);
234-
END";
235-
}
236-
237-
private static string BuildAddMissingColumnsDdl(DocumentMapping mapping)
238-
{
239-
var schema = mapping.DatabaseSchemaName;
240-
var table = mapping.TableName;
241-
return $"""
242-
IF NOT EXISTS (SELECT 1 FROM sys.columns
243-
WHERE object_id = OBJECT_ID('[{schema}].[{table}]')
244-
AND name = 'created_at')
245-
BEGIN
246-
ALTER TABLE [{schema}].[{table}]
247-
ADD created_at datetimeoffset NOT NULL DEFAULT SYSDATETIMEOFFSET();
248-
END
249-
""";
250-
}
251-
252147
}

src/Polecat/Projections/Flattened/FlatTableProjection.cs

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -232,40 +232,15 @@ public ISubscriptionExecution BuildExecution(
232232

233233
private async Task EnsureTableAsync(DocumentSessionBase session, CancellationToken cancellation)
234234
{
235-
await using var cmd = new Microsoft.Data.SqlClient.SqlCommand();
236-
cmd.CommandText = $"""
237-
IF NOT EXISTS (SELECT * FROM sys.tables t
238-
JOIN sys.schemas s ON t.schema_id = s.schema_id
239-
WHERE s.name = '{Table.Identifier.Schema}' AND t.name = '{Table.Identifier.Name}')
240-
BEGIN
241-
{GenerateCreateTableDdl()}
242-
END
243-
""";
235+
var migrator = new SqlServerMigrator();
236+
var writer = new StringWriter();
237+
Table.WriteCreateStatement(migrator, writer);
244238

239+
await using var cmd = new SqlCommand();
240+
cmd.CommandText = writer.ToString();
245241
await session.ExecuteAsync(cmd, cancellation);
246242
}
247243

248-
private string GenerateCreateTableDdl()
249-
{
250-
var columns = Table.Columns.Select(c =>
251-
{
252-
var nullable = c.AllowNulls && !c.IsPrimaryKey ? "NULL" : "NOT NULL";
253-
var identity = c.IsAutoNumber ? " IDENTITY(1,1)" : "";
254-
return $"[{c.Name}] {c.Type}{identity} {nullable}";
255-
});
256-
257-
var pkColumns = Table.PrimaryKeyColumns;
258-
var pkConstraint = pkColumns.Count > 0
259-
? $", CONSTRAINT [PK_{Table.Identifier.Name}] PRIMARY KEY ({string.Join(", ", pkColumns.Select(c => $"[{c}]"))})"
260-
: "";
261-
262-
return $"""
263-
CREATE TABLE [{Table.Identifier.Schema}].[{Table.Identifier.Name}] (
264-
{string.Join(",\n ", columns)}{pkConstraint}
265-
);
266-
""";
267-
}
268-
269244
private static MemberInfo[]? GetMemberPath<T>(Expression<Func<T, object>> expression)
270245
{
271246
var body = expression.Body;

src/Polecat/Schema/Identity/Sequences/HiloSequence.cs

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
using JasperFx;
12
using Microsoft.Data.SqlClient;
23
using Polly;
34
using Polecat.Exceptions;
45
using Polecat.Internal;
6+
using Weasel.Core;
7+
using Weasel.SqlServer;
58

69
namespace Polecat.Schema.Identity.Sequences;
710

@@ -119,40 +122,24 @@ private async Task EnsureHiloTableAsync(SqlConnection conn, CancellationToken ct
119122
{
120123
if (_tableEnsured) return;
121124

122-
await using var cmd = conn.CreateCommand();
123-
cmd.CommandText = HiloTableDdl();
124-
await cmd.ExecuteNonQueryAsync(ct);
125+
var table = new HiloTable(_schemaName);
126+
var migrator = new SqlServerMigrator();
127+
var migration = await SchemaMigration.DetermineAsync(conn, ct, table);
128+
await migrator.ApplyAllAsync(conn, migration, AutoCreate.CreateOrUpdate, ct: ct);
125129
_tableEnsured = true;
126130
}
127131

128132
private void EnsureHiloTableSync(SqlConnection conn)
129133
{
130134
if (_tableEnsured) return;
131135

132-
using var cmd = conn.CreateCommand();
133-
cmd.CommandText = HiloTableDdl();
134-
cmd.ExecuteNonQuery();
136+
var table = new HiloTable(_schemaName);
137+
var migrator = new SqlServerMigrator();
138+
var migration = SchemaMigration.DetermineAsync(conn, table).GetAwaiter().GetResult();
139+
migrator.ApplyAllAsync(conn, migration, AutoCreate.CreateOrUpdate).GetAwaiter().GetResult();
135140
_tableEnsured = true;
136141
}
137142

138-
private string HiloTableDdl()
139-
{
140-
return $"""
141-
IF NOT EXISTS (SELECT 1 FROM sys.tables t
142-
JOIN sys.schemas s ON t.schema_id = s.schema_id
143-
WHERE s.name = '{_schemaName}' AND t.name = 'pc_hilo')
144-
BEGIN
145-
IF SCHEMA_ID('{_schemaName}') IS NULL
146-
EXEC('CREATE SCHEMA [{_schemaName}]');
147-
148-
CREATE TABLE [{_schemaName}].[pc_hilo] (
149-
entity_name varchar(250) NOT NULL PRIMARY KEY,
150-
hi_value bigint NOT NULL DEFAULT 0
151-
);
152-
END
153-
""";
154-
}
155-
156143
private async Task<long> TryGetNextHiAsync(SqlConnection conn, CancellationToken ct)
157144
{
158145
// Read current hi_value
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
using Weasel.SqlServer;
2+
using Weasel.SqlServer.Tables;
3+
4+
namespace Polecat.Schema.Identity.Sequences;
5+
6+
/// <summary>
7+
/// Weasel table definition for pc_hilo — stores HiLo sequence counters for numeric ID generation.
8+
/// </summary>
9+
internal class HiloTable : Table
10+
{
11+
public const string TableName = "pc_hilo";
12+
13+
public HiloTable(string schemaName)
14+
: base(new SqlServerObjectName(schemaName, TableName))
15+
{
16+
AddColumn("entity_name", "varchar(250)").AsPrimaryKey().NotNull();
17+
AddColumn("hi_value", "bigint").NotNull().DefaultValue(0);
18+
}
19+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using Polecat.Schema.Identity.Sequences;
2+
using Weasel.Core;
3+
using Weasel.Core.Migrations;
4+
using Weasel.SqlServer;
5+
6+
namespace Polecat.Storage;
7+
8+
/// <summary>
9+
/// Weasel feature schema that yields all document tables and the HiLo sequence table.
10+
/// Participates in ApplyAllConfiguredChangesToDatabaseAsync() for schema migration.
11+
/// </summary>
12+
internal class DocumentFeatureSchema : FeatureSchemaBase
13+
{
14+
private readonly StoreOptions _options;
15+
16+
public DocumentFeatureSchema(StoreOptions options)
17+
: base("Documents", new SqlServerMigrator())
18+
{
19+
_options = options;
20+
}
21+
22+
public override Type StorageType => typeof(DocumentFeatureSchema);
23+
24+
protected override IEnumerable<ISchemaObject> schemaObjects()
25+
{
26+
// HiLo table first — numeric ID document types depend on it
27+
if (_options.Providers.AllProviders.Any(p => p.Mapping.IsNumericId))
28+
{
29+
yield return new HiloTable(_options.DatabaseSchemaName);
30+
}
31+
32+
foreach (var provider in _options.Providers.AllProviders)
33+
{
34+
yield return new DocumentTable(provider.Mapping);
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)