Skip to content

Commit d378225

Browse files
committed
fix some design issues
Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
1 parent 11738ab commit d378225

2 files changed

Lines changed: 171 additions & 53 deletions

File tree

pkg/compose/plan_executor.go

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sync"
2828

2929
"github.com/compose-spec/compose-go/v2/types"
30+
"github.com/containerd/errdefs"
3031
containerType "github.com/moby/moby/api/types/container"
3132
"github.com/moby/moby/client"
3233
"go.opentelemetry.io/otel"
@@ -43,15 +44,11 @@ import (
4344
type executionState struct {
4445
mu sync.Mutex
4546
containers map[string]Containers // service name -> containers created/updated
46-
networks map[string]string // network key -> ID
47-
volumes map[string]string // volume key -> ID
4847
}
4948

5049
func newExecutionState() *executionState {
5150
return &executionState{
5251
containers: make(map[string]Containers),
53-
networks: make(map[string]string),
54-
volumes: make(map[string]string),
5552
}
5653
}
5754

@@ -79,18 +76,6 @@ func (es *executionState) getContainers(serviceName string) Containers {
7976
return slices.Clone(es.containers[serviceName])
8077
}
8178

82-
func (es *executionState) setNetworkID(key, id string) {
83-
es.mu.Lock()
84-
defer es.mu.Unlock()
85-
es.networks[key] = id
86-
}
87-
88-
func (es *executionState) setVolumeID(key, id string) {
89-
es.mu.Lock()
90-
defer es.mu.Unlock()
91-
es.volumes[key] = id
92-
}
93-
9479
// resolveServiceReferences replaces service references in a ServiceConfig with
9580
// actual container IDs from the execution state. This mirrors the logic in
9681
// convergence.resolveServiceReferences but uses executionState instead.
@@ -157,6 +142,13 @@ func (s *composeService) ExecutePlan(ctx context.Context, project *types.Project
157142
return nil
158143
}
159144

145+
// Validate the plan has no dependency cycles before executing.
146+
// Without this check, a cycle would cause the executor to hang
147+
// indefinitely waiting for operations that can never be scheduled.
148+
if _, err := topologicalSort(plan); err != nil {
149+
return err
150+
}
151+
160152
// Pre-populate execution state with existing containers so that
161153
// resolveServiceReferences can find containers for services not
162154
// included in the plan (e.g. --no-deps scenarios).
@@ -172,7 +164,15 @@ func (s *composeService) ExecutePlan(ctx context.Context, project *types.Project
172164
expect := len(plan.Operations)
173165
eg, ctx := errgroup.WithContext(ctx)
174166
opCh := make(chan *Operation, expect)
175-
defer close(opCh)
167+
168+
// sendDone sends a completed operation to the consumer goroutine,
169+
// respecting context cancellation to avoid blocking or panicking.
170+
sendDone := func(op *Operation) {
171+
select {
172+
case opCh <- op:
173+
case <-ctx.Done():
174+
}
175+
}
176176

177177
// Consumer goroutine: waits for completed ops and enqueues newly-ready dependents
178178
eg.Go(func() error {
@@ -195,7 +195,7 @@ func (s *composeService) ExecutePlan(ctx context.Context, project *types.Project
195195
if err := s.executeOperation(ctx, project, depOp, state); err != nil {
196196
return err
197197
}
198-
opCh <- depOp
198+
sendDone(depOp)
199199
return nil
200200
})
201201
}
@@ -210,7 +210,7 @@ func (s *composeService) ExecutePlan(ctx context.Context, project *types.Project
210210
if err := s.executeOperation(ctx, project, op, state); err != nil {
211211
return err
212212
}
213-
opCh <- op
213+
sendDone(op)
214214
return nil
215215
})
216216
}
@@ -245,15 +245,15 @@ func (s *composeService) executeOperation(ctx context.Context, project *types.Pr
245245
func (s *composeService) dispatchOperation(ctx context.Context, project *types.Project, op *Operation, state *executionState) error {
246246
switch op.Type {
247247
case OpCreateNetwork:
248-
return s.executePlanCreateNetwork(ctx, project, op, state)
248+
return s.executePlanCreateNetwork(ctx, project, op)
249249
case OpRemoveNetwork:
250250
return s.executePlanRemoveNetwork(ctx, project, op)
251251
case OpDisconnectNetwork:
252252
return s.executePlanDisconnectNetwork(ctx, op)
253253
case OpConnectNetwork:
254254
return s.executePlanConnectNetwork(ctx, op)
255255
case OpCreateVolume:
256-
return s.executePlanCreateVolume(ctx, project, op, state)
256+
return s.executePlanCreateVolume(ctx, project, op)
257257
case OpRemoveVolume:
258258
return s.executePlanRemoveVolume(ctx, op)
259259
case OpCreateContainer:
@@ -273,13 +273,9 @@ func (s *composeService) dispatchOperation(ctx context.Context, project *types.P
273273
}
274274
}
275275

276-
func (s *composeService) executePlanCreateNetwork(ctx context.Context, project *types.Project, op *Operation, state *executionState) error {
277-
id, err := s.ensureNetwork(ctx, project, op.NetworkOp.NetworkKey, op.NetworkOp.Desired)
278-
if err != nil {
279-
return err
280-
}
281-
state.setNetworkID(op.NetworkOp.NetworkKey, id)
282-
return nil
276+
func (s *composeService) executePlanCreateNetwork(ctx context.Context, project *types.Project, op *Operation) error {
277+
_, err := s.ensureNetwork(ctx, project, op.NetworkOp.NetworkKey, op.NetworkOp.Desired)
278+
return err
283279
}
284280

285281
func (s *composeService) executePlanRemoveNetwork(ctx context.Context, project *types.Project, op *Operation) error {
@@ -301,17 +297,13 @@ func (s *composeService) executePlanConnectNetwork(ctx context.Context, op *Oper
301297
return err
302298
}
303299

304-
func (s *composeService) executePlanCreateVolume(ctx context.Context, project *types.Project, op *Operation, state *executionState) error {
300+
func (s *composeService) executePlanCreateVolume(ctx context.Context, project *types.Project, op *Operation) error {
305301
volume := *op.VolumeOp.Desired
306302
volume.CustomLabels = volume.CustomLabels.Add(api.VolumeLabel, op.VolumeOp.VolumeKey)
307303
volume.CustomLabels = volume.CustomLabels.Add(api.ProjectLabel, project.Name)
308304
volume.CustomLabels = volume.CustomLabels.Add(api.VersionLabel, api.ComposeVersion)
309-
id, err := s.ensureVolume(ctx, op.VolumeOp.VolumeKey, volume, project)
310-
if err != nil {
311-
return err
312-
}
313-
state.setVolumeID(op.VolumeOp.VolumeKey, id)
314-
return nil
305+
_, err := s.ensureVolume(ctx, op.VolumeOp.VolumeKey, volume, project)
306+
return err
315307
}
316308

317309
func (s *composeService) executePlanRemoveVolume(ctx context.Context, op *Operation) error {
@@ -410,8 +402,18 @@ func (s *composeService) executePlanStopContainer(ctx context.Context, op *Opera
410402
}
411403

412404
func (s *composeService) executePlanRemoveContainer(ctx context.Context, op *Operation) error {
413-
service := op.ContainerOp.Service
414-
return s.stopAndRemoveContainer(ctx, *op.ContainerOp.Existing, &service, op.ContainerOp.Timeout, false)
405+
ctr := *op.ContainerOp.Existing
406+
eventName := getContainerProgressName(ctr)
407+
s.events.On(removingEvent(eventName))
408+
_, err := s.apiClient().ContainerRemove(ctx, ctr.ID, client.ContainerRemoveOptions{
409+
Force: true,
410+
})
411+
if err != nil && !errdefs.IsNotFound(err) && !errdefs.IsConflict(err) {
412+
s.events.On(errorEvent(eventName, "Error while Removing"))
413+
return err
414+
}
415+
s.events.On(removedEvent(eventName))
416+
return nil
415417
}
416418

417419
func (s *composeService) executePlanRunPlugin(ctx context.Context, project *types.Project, op *Operation) error {

pkg/compose/reconcile.go

Lines changed: 132 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,11 @@ type ContainerOperation struct {
125125
Existing *container.Summary
126126
Inherit bool
127127
Timeout *time.Duration
128+
NetworkRecreate bool // true when this op was created for a network recreation
128129
}
129130

130131
// RenameOperation holds details for renaming a container.
131132
type RenameOperation struct {
132-
ContainerID string
133133
CurrentName string
134134
NewName string
135135
}
@@ -303,6 +303,14 @@ func Reconcile(project *types.Project, observed *ObservedState, opts ReconcileOp
303303
return nil, err
304304
}
305305

306+
// Step 3b - Clean up stale network disconnect/connect operations.
307+
// When a container is being recreated (has a rename op in the plan),
308+
// any disconnect/connect ops created by reconcileNetworks use the old
309+
// container ID which will be stale after recreation. The new container
310+
// will be connected to the recreated network automatically via
311+
// buildDependencyEdges (create-container depends on create-network).
312+
pruneStaleNetworkOpsForRecreatedContainers(plan)
313+
306314
// Step 4 - Orphans
307315
if opts.RemoveOrphans {
308316
reconcileOrphans(observed.Orphans, plan, opts)
@@ -446,9 +454,10 @@ func reconcileNetworks(project *types.Project, observed *ObservedState, plan *Re
446454
ServiceName: ctr.Labels[api.ServiceLabel],
447455
Resource: ctrName,
448456
ContainerOp: &ContainerOperation{
449-
ContainerName: ctrName,
450-
Existing: &ctr,
451-
Timeout: opts.Timeout,
457+
ContainerName: ctrName,
458+
Existing: &ctr,
459+
Timeout: opts.Timeout,
460+
NetworkRecreate: true,
452461
},
453462
Reason: fmt.Sprintf("network %q is being recreated", n.Name),
454463
}
@@ -1228,25 +1237,132 @@ func addCascadingRestarts(project *types.Project, observed *ObservedState, plan
12281237
}
12291238

12301239
startID := fmt.Sprintf("start-container:%s", ctrName)
1231-
plan.Operations[startID] = &Operation{
1232-
ID: startID,
1233-
Type: OpStartContainer,
1234-
ServiceName: service.Name,
1235-
Resource: ctrName,
1236-
ContainerOp: &ContainerOperation{
1237-
Service: service,
1238-
ContainerName: ctrName,
1239-
Existing: &ctr,
1240-
},
1241-
DependsOn: []string{stopID},
1242-
Reason: fmt.Sprintf("restart after dependency %q recreated", depName),
1240+
if existingStart, exists := plan.Operations[startID]; exists {
1241+
// A start operation already exists (e.g. from network recreation).
1242+
// Add our stop as a dependency instead of overwriting.
1243+
if !slices.Contains(existingStart.DependsOn, stopID) {
1244+
existingStart.DependsOn = append(existingStart.DependsOn, stopID)
1245+
}
1246+
} else {
1247+
plan.Operations[startID] = &Operation{
1248+
ID: startID,
1249+
Type: OpStartContainer,
1250+
ServiceName: service.Name,
1251+
Resource: ctrName,
1252+
ContainerOp: &ContainerOperation{
1253+
Service: service,
1254+
ContainerName: ctrName,
1255+
Existing: &ctr,
1256+
},
1257+
DependsOn: []string{stopID},
1258+
Reason: fmt.Sprintf("restart after dependency %q recreated", depName),
1259+
}
12431260
}
12441261
}
12451262
break // Only need to process once per service
12461263
}
12471264
}
12481265
}
12491266

1267+
// pruneStaleNetworkOpsForRecreatedContainers removes disconnect/connect/start
1268+
// operations that were created by reconcileNetworks for containers that are
1269+
// also being recreated. When a container is recreated, the old container ID
1270+
// used in connect/disconnect ops becomes stale. The new container will
1271+
// automatically connect to the new network because buildDependencyEdges
1272+
// ensures create-container depends on create-network.
1273+
func pruneStaleNetworkOpsForRecreatedContainers(plan *ReconciliationPlan) {
1274+
recreatedContainers := collectRecreatedContainerNames(plan)
1275+
if len(recreatedContainers) == 0 {
1276+
return
1277+
}
1278+
1279+
toDelete := findStaleNetworkOps(plan, recreatedContainers)
1280+
deletePlanOps(plan, toDelete)
1281+
}
1282+
1283+
// collectRecreatedContainerNames returns the final names of containers that
1284+
// have a recreate chain (identified by rename operations).
1285+
func collectRecreatedContainerNames(plan *ReconciliationPlan) map[string]bool {
1286+
result := map[string]bool{}
1287+
for _, op := range plan.Operations {
1288+
if op.Type == OpRenameContainer && op.RenameOp != nil {
1289+
result[op.RenameOp.NewName] = true
1290+
}
1291+
}
1292+
return result
1293+
}
1294+
1295+
// findStaleNetworkOps identifies disconnect/connect/start/stop operations that
1296+
// reference containers being recreated. These ops use stale container IDs.
1297+
func findStaleNetworkOps(plan *ReconciliationPlan, recreated map[string]bool) []string {
1298+
var toDelete []string
1299+
for id, op := range plan.Operations {
1300+
switch op.Type {
1301+
case OpDisconnectNetwork, OpConnectNetwork:
1302+
if isNetworkOpForRecreatedContainer(op, recreated) {
1303+
toDelete = append(toDelete, id)
1304+
}
1305+
case OpStartContainer:
1306+
if isNetworkStartForRecreatedContainer(plan, op, recreated) {
1307+
toDelete = append(toDelete, id)
1308+
}
1309+
case OpStopContainer:
1310+
if isNetworkStopForRecreatedContainer(op, recreated) {
1311+
toDelete = append(toDelete, id)
1312+
}
1313+
}
1314+
}
1315+
return toDelete
1316+
}
1317+
1318+
func isNetworkOpForRecreatedContainer(op *Operation, recreated map[string]bool) bool {
1319+
if op.ContainerNetworkOp == nil {
1320+
return false
1321+
}
1322+
parts := strings.Fields(op.Resource)
1323+
return len(parts) > 0 && recreated[parts[0]]
1324+
}
1325+
1326+
func isNetworkStartForRecreatedContainer(plan *ReconciliationPlan, op *Operation, recreated map[string]bool) bool {
1327+
if op.ContainerOp == nil || !recreated[op.ContainerOp.ContainerName] {
1328+
return false
1329+
}
1330+
// Only remove start ops created by network recreation (depend on a connect op),
1331+
// not the start from the recreate chain itself (depends on a rename op).
1332+
for _, depID := range op.DependsOn {
1333+
if dep, ok := plan.Operations[depID]; ok && dep.Type == OpConnectNetwork {
1334+
return true
1335+
}
1336+
}
1337+
return false
1338+
}
1339+
1340+
func isNetworkStopForRecreatedContainer(op *Operation, recreated map[string]bool) bool {
1341+
if op.ContainerOp == nil || !recreated[op.ContainerOp.ContainerName] {
1342+
return false
1343+
}
1344+
// Only remove stop ops from network recreation, not from the recreate chain.
1345+
return op.ContainerOp.NetworkRecreate
1346+
}
1347+
1348+
// deletePlanOps removes operations by ID and cleans up DependsOn references.
1349+
func deletePlanOps(plan *ReconciliationPlan, ids []string) {
1350+
deleted := map[string]bool{}
1351+
for _, id := range ids {
1352+
deleted[id] = true
1353+
delete(plan.Operations, id)
1354+
}
1355+
for _, op := range plan.Operations {
1356+
var cleaned []string
1357+
for _, depID := range op.DependsOn {
1358+
if !deleted[depID] {
1359+
cleaned = append(cleaned, depID)
1360+
}
1361+
}
1362+
op.DependsOn = cleaned
1363+
}
1364+
}
1365+
12501366
// serviceUsesRecreatedVolume checks if a service mounts any volume that is
12511367
// being recreated, and returns a reason string if so.
12521368
func serviceUsesRecreatedVolume(service types.ServiceConfig, recreatedVolumes map[string]bool) string {

0 commit comments

Comments
 (0)