Skip to content

Commit fd028a6

Browse files
committed
Add initial logical replication handling
1 parent cafe112 commit fd028a6

6 files changed

Lines changed: 510 additions & 30 deletions

File tree

spec/pg/replication_spec.cr

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
require "../spec_helper"
2+
3+
struct TestMessageHandler
4+
include PG::Replication::Handler
5+
6+
def received(msg : PG::Replication::Begin)
7+
pp begin: msg
8+
end
9+
10+
def received(msg : PG::Replication::Message)
11+
pp message: msg
12+
end
13+
14+
def received(msg : PG::Replication::Commit)
15+
pp commit: msg
16+
end
17+
18+
def received(msg : PG::Replication::Origin)
19+
end
20+
21+
def received(msg : PG::Replication::Relation)
22+
pp relation: msg
23+
end
24+
25+
def received(msg : PG::Replication::Type)
26+
end
27+
28+
def received(msg : PG::Replication::Insert)
29+
pp insert: msg
30+
end
31+
32+
def received(msg : PG::Replication::Update)
33+
pp update: msg
34+
end
35+
36+
def received(msg : PG::Replication::Delete)
37+
pp delete: msg
38+
end
39+
40+
def received(msg : PG::Replication::Truncate)
41+
end
42+
43+
def received(msg : PG::Replication::StreamStart)
44+
end
45+
46+
def received(msg : PG::Replication::StreamStop)
47+
end
48+
49+
def received(msg : PG::Replication::StreamCommit)
50+
end
51+
52+
def received(msg : PG::Replication::StreamAbort)
53+
end
54+
55+
def received(msg : PG::Replication::BeginPrepare)
56+
end
57+
58+
def received(msg : PG::Replication::Prepare)
59+
end
60+
61+
def received(msg : PG::Replication::CommitPrepared)
62+
end
63+
64+
def received(msg : PG::Replication::RollbackPrepared)
65+
end
66+
67+
def received(msg : PG::Replication::StreamPrepare)
68+
end
69+
70+
def received(msg : PG::Replication::TupleData)
71+
end
72+
end
73+
74+
describe PG::Replication do
75+
it "consumes the WAL" do
76+
publication_name = "test_publication_#{Random::Secure.hex}"
77+
slot_name = "test_slot_#{Random::Secure.hex}"
78+
table_name = "test_table_#{Random::Secure.hex}"
79+
handler = TestMessageHandler.new
80+
PG_DB.exec "CREATE PUBLICATION #{publication_name} FOR ALL TABLES"
81+
PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slot_name
82+
subscriber = PG.connect_replication(DB_URL, handler: handler, publication_name: publication_name, slot_name: slot_name)
83+
sleep 100.milliseconds
84+
PG_DB.exec "DROP TABLE IF EXISTS #{table_name}"
85+
PG_DB.exec <<-SQL
86+
CREATE TABLE #{table_name}(
87+
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
88+
string TEXT,
89+
number INT8
90+
)
91+
SQL
92+
id = PG_DB.query_one <<-SQL, as: UUID
93+
INSERT INTO #{table_name} (string, number)
94+
VALUES ('foo', 1)
95+
RETURNING id
96+
SQL
97+
98+
PG_DB.exec "UPDATE #{table_name} SET string = 'bar' WHERE id = $1", id
99+
PG_DB.exec "UPDATE #{table_name} SET number = 2 WHERE id = $1", id
100+
101+
sleep 100.milliseconds
102+
103+
pp handler
104+
ensure
105+
subscriber.try &.close
106+
PG_DB.exec "SELECT pg_drop_replication_slot($1)", slot_name
107+
PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}"
108+
PG_DB.exec "DROP TABLE IF EXISTS #{table_name}"
109+
end
110+
end

src/pg.cr

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ module PG
3131
ListenConnection.new(url, channels, blocking, &blk)
3232
end
3333

34+
def self.connect_replication(url, *, handler, publication_name, slot_name)
35+
Replication::Connection.new(url, handler, publication_name: publication_name, slot_name: slot_name)
36+
end
37+
3438
class ListenConnection
3539
@conn : PG::Connection
3640

src/pg/connection.cr

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module PG
1717
super(options)
1818

1919
begin
20-
@connection.connect
20+
@connection.connect(replication: @connection.conninfo.replication)
2121
rescue ex
2222
raise DB::ConnectionRefused.new(cause: ex)
2323
end
@@ -95,6 +95,14 @@ module PG
9595
end
9696
end
9797

98+
protected def listen_replication(publication_name : String, slot_name : String, blocking : Bool = false, &block : Replication::Frame ->)
99+
if blocking
100+
@connection.start_replication_frame_loop(publication_name, slot_name, &block)
101+
else
102+
spawn { @connection.start_replication_frame_loop(publication_name, slot_name, &block) }
103+
end
104+
end
105+
98106
def version
99107
vers = connection.server_parameters["server_version"].partition(' ').first.split('.').map(&.to_i)
100108
{major: vers[0], minor: vers[1], patch: vers[2]? || 0}

0 commit comments

Comments
 (0)