Skip to content

Commit 0932b5f

Browse files
bricefclaude
andcommitted
Add notification system to MCP server
The MCP server subscribes to the global event stream at startup, filtering out self-events by actor name. Notifications from other actors are buffered (up to 50) and piggybacked onto tool responses as a text block listing what changed since the last tool call. - Notifier with ring buffer, concurrent-safe Drain() - Human-readable notification summaries (created, transitioned, etc.) - withNotifications wrapper applied to all tool handlers - Actor identity resolved at startup via /me for self-filtering - Graceful fallback: if /me fails or no events arrive, tools work normally Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b17307d commit 0932b5f

3 files changed

Lines changed: 197 additions & 9 deletions

File tree

cmd/taskflow-mcp/main.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,22 @@ func main() {
2929
}
3030

3131
client := httpclient.New(serverURL, apiKey)
32-
server := tfmcp.NewServer(client)
32+
33+
// Discover self identity for notification filtering.
34+
actor, err := client.WhoAmI()
35+
if err != nil {
36+
fmt.Fprintf(os.Stderr, "Warning: could not resolve actor identity: %v\n", err)
37+
}
38+
39+
// Start event subscription for notifications.
40+
ctx, cancel := context.WithCancel(context.Background())
41+
defer cancel()
42+
var notifier *tfmcp.Notifier
43+
if actor.Name != "" {
44+
notifier = tfmcp.NewNotifier(ctx, client, actor.Name)
45+
}
46+
47+
server := tfmcp.NewServer(client, notifier)
3348

3449
if err := server.Run(context.Background(), &gomcp.StdioTransport{}); err != nil {
3550
fmt.Fprintf(os.Stderr, "MCP server error: %v\n", err)

internal/mcp/notifications.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package mcp
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/bricef/taskflow/internal/eventbus"
9+
"github.com/bricef/taskflow/internal/httpclient"
10+
)
11+
12+
const maxPendingNotifications = 50
13+
14+
// Notification is a summary of a domain event from another actor.
15+
type Notification struct {
16+
Event string `json:"event"`
17+
Board string `json:"board"`
18+
Task string `json:"task,omitempty"`
19+
Actor string `json:"actor"`
20+
Summary string `json:"summary"`
21+
Timestamp string `json:"timestamp"`
22+
}
23+
24+
// Notifier subscribes to the global event stream and buffers notifications
25+
// from other actors. Pending notifications are retrieved and cleared on each
26+
// tool call via Drain().
27+
type Notifier struct {
28+
mu sync.Mutex
29+
pending []Notification
30+
actorName string
31+
}
32+
33+
// NewNotifier starts a background event subscription and buffers notifications
34+
// from actors other than selfName. Cancel the context to stop.
35+
func NewNotifier(ctx context.Context, client *httpclient.Client, selfName string) *Notifier {
36+
n := &Notifier{actorName: selfName}
37+
38+
stream := client.Subscribe(ctx, httpclient.SubscribeOptions{})
39+
go func() {
40+
for {
41+
select {
42+
case evt, ok := <-stream.Events:
43+
if !ok {
44+
return
45+
}
46+
if evt.Actor.Name == n.actorName {
47+
continue // skip self-events
48+
}
49+
n.add(evt)
50+
case <-stream.Errors:
51+
// Reconnection is handled by httpclient.Subscribe.
52+
// We just ignore errors here.
53+
case <-ctx.Done():
54+
return
55+
}
56+
}
57+
}()
58+
59+
return n
60+
}
61+
62+
func (n *Notifier) add(evt eventbus.Event) {
63+
n.mu.Lock()
64+
defer n.mu.Unlock()
65+
66+
task := ""
67+
snap := evt.After
68+
if snap == nil {
69+
snap = evt.Before
70+
}
71+
if snap != nil {
72+
task = snap.Ref
73+
}
74+
75+
notif := Notification{
76+
Event: evt.Type,
77+
Board: evt.Board.Slug,
78+
Task: task,
79+
Actor: evt.Actor.Name,
80+
Summary: formatNotification(evt),
81+
Timestamp: evt.Timestamp.Format("2006-01-02T15:04:05Z"),
82+
}
83+
84+
n.pending = append(n.pending, notif)
85+
if len(n.pending) > maxPendingNotifications {
86+
n.pending = n.pending[len(n.pending)-maxPendingNotifications:]
87+
}
88+
}
89+
90+
// Drain returns all pending notifications and clears the buffer.
91+
func (n *Notifier) Drain() []Notification {
92+
n.mu.Lock()
93+
defer n.mu.Unlock()
94+
95+
if len(n.pending) == 0 {
96+
return nil
97+
}
98+
result := n.pending
99+
n.pending = nil
100+
return result
101+
}
102+
103+
func formatNotification(evt eventbus.Event) string {
104+
actor := evt.Actor.Name
105+
if actor == "" {
106+
actor = "system"
107+
}
108+
109+
snap := evt.After
110+
if snap == nil {
111+
snap = evt.Before
112+
}
113+
114+
switch evt.Type {
115+
case eventbus.EventTaskCreated:
116+
if snap != nil {
117+
return fmt.Sprintf("%s created %s %q", actor, snap.Ref, snap.Title)
118+
}
119+
case eventbus.EventTaskTransitioned:
120+
if evt.Before != nil && snap != nil {
121+
return fmt.Sprintf("%s moved %s from %s to %s", actor, snap.Ref, evt.Before.State, snap.State)
122+
}
123+
case eventbus.EventTaskAssigned:
124+
if snap != nil && snap.Assignee != nil {
125+
return fmt.Sprintf("%s assigned %s to %s", actor, snap.Ref, *snap.Assignee)
126+
}
127+
case eventbus.EventTaskCommented:
128+
if snap != nil {
129+
return fmt.Sprintf("%s commented on %s", actor, snap.Ref)
130+
}
131+
case eventbus.EventTaskDeleted:
132+
if evt.Before != nil {
133+
return fmt.Sprintf("%s deleted %s", actor, evt.Before.Ref)
134+
}
135+
}
136+
137+
if snap != nil {
138+
return fmt.Sprintf("%s: %s on %s", actor, evt.Type, snap.Ref)
139+
}
140+
return fmt.Sprintf("%s: %s", actor, evt.Type)
141+
}

internal/mcp/server.go

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ import (
1717
)
1818

1919
// NewServer creates an MCP server with all TaskFlow resources and tools registered.
20-
func NewServer(client *httpclient.Client) *gomcp.Server {
20+
// If notifier is non-nil, tool responses include pending notifications from other actors.
21+
func NewServer(client *httpclient.Client, notifier *Notifier) *gomcp.Server {
2122
server := gomcp.NewServer(&gomcp.Implementation{
2223
Name: "taskflow",
2324
Version: "1.0.0",
2425
}, nil)
2526

2627
registerResources(server, client)
27-
registerTools(server, client)
28-
registerWhoAmI(server, client)
28+
registerTools(server, client, notifier)
29+
registerWhoAmI(server, client, notifier)
2930

3031
return server
3132
}
@@ -78,7 +79,7 @@ func resourceHandler(client *httpclient.Client, res model.Resource) gomcp.Resour
7879

7980
// --- Tools ---
8081

81-
func registerTools(server *gomcp.Server, client *httpclient.Client) {
82+
func registerTools(server *gomcp.Server, client *httpclient.Client, notifier *Notifier) {
8283
for _, op := range model.Operations() {
8384
op := op
8485
server.AddTool(
@@ -87,7 +88,7 @@ func registerTools(server *gomcp.Server, client *httpclient.Client) {
8788
Description: descriptionOf(op.Summary, op.Description),
8889
InputSchema: buildInputSchema(op),
8990
},
90-
toolHandler(client, op),
91+
withNotifications(notifier, toolHandler(client, op)),
9192
)
9293
}
9394
}
@@ -154,14 +155,14 @@ func toolHandler(client *httpclient.Client, op model.Operation) gomcp.ToolHandle
154155

155156
// --- WhoAmI ---
156157

157-
func registerWhoAmI(server *gomcp.Server, client *httpclient.Client) {
158+
func registerWhoAmI(server *gomcp.Server, client *httpclient.Client, notifier *Notifier) {
158159
server.AddTool(
159160
&gomcp.Tool{
160161
Name: "whoami",
161162
Description: "Returns your actor identity (name, role, type) as determined by your API key.",
162163
InputSchema: map[string]any{"type": "object", "properties": map[string]any{}},
163164
},
164-
func(ctx context.Context, req *gomcp.CallToolRequest) (*gomcp.CallToolResult, error) {
165+
withNotifications(notifier, func(ctx context.Context, req *gomcp.CallToolRequest) (*gomcp.CallToolResult, error) {
165166
actor, err := client.WithContext(ctx).WhoAmI()
166167
if err != nil {
167168
return &gomcp.CallToolResult{
@@ -173,10 +174,41 @@ func registerWhoAmI(server *gomcp.Server, client *httpclient.Client) {
173174
return &gomcp.CallToolResult{
174175
Content: []gomcp.Content{&gomcp.TextContent{Text: string(data)}},
175176
}, nil
176-
},
177+
}),
177178
)
178179
}
179180

181+
// --- Notification piggyback ---
182+
183+
// withNotifications wraps a tool handler to append pending notifications
184+
// to the response. If notifier is nil, the handler is returned unchanged.
185+
func withNotifications(notifier *Notifier, handler gomcp.ToolHandler) gomcp.ToolHandler {
186+
if notifier == nil {
187+
return handler
188+
}
189+
return func(ctx context.Context, req *gomcp.CallToolRequest) (*gomcp.CallToolResult, error) {
190+
result, err := handler(ctx, req)
191+
if err != nil || result == nil {
192+
return result, err
193+
}
194+
195+
notifications := notifier.Drain()
196+
if len(notifications) == 0 {
197+
return result, nil
198+
}
199+
200+
// Append notifications as a separate text block.
201+
var summary strings.Builder
202+
summary.WriteString(fmt.Sprintf("\n--- %d notification(s) from other actors ---\n", len(notifications)))
203+
for _, n := range notifications {
204+
summary.WriteString(fmt.Sprintf(" [%s] %s\n", n.Timestamp, n.Summary))
205+
}
206+
207+
result.Content = append(result.Content, &gomcp.TextContent{Text: summary.String()})
208+
return result, nil
209+
}
210+
}
211+
180212
// --- Schema generation ---
181213

182214
func buildInputSchema(op model.Operation) map[string]any {

0 commit comments

Comments
 (0)