Skip to content

Commit 16faad6

Browse files
committed
Harden replication transport lifecycle and restart recovery
- add socket-aware stream polling - normalize idle get_copy_data responses - improve reconnect and recovery behavior - add PostgreSQL restart recovery E2E coverage - fix replication timeout recovery edge cases - improve E2E startup readiness checks - harden replication slot lifecycle handling - expand YARD documentation coverage
1 parent 23223a8 commit 16faad6

13 files changed

Lines changed: 161 additions & 14 deletions

File tree

‎CHANGELOG.md‎

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,35 @@
22

33
## Unreleased
44

5+
## 0.2.4 - 2026-06-17
6+
7+
### Added
8+
9+
- Added socket-aware replication stream polling.
10+
- Added E2E coverage for PostgreSQL restart and replication stream recovery.
11+
- Added connection readiness checks to E2E infrastructure.
12+
13+
### Changed
14+
15+
- Improved replication stream handling during idle periods.
16+
- Improved reconnect behavior after PostgreSQL restarts.
17+
- Improved standby feedback reliability during long-running streams.
18+
- Improved E2E test stability across PostgreSQL startup and restart scenarios.
19+
- Normalized `PG#get_copy_data` idle responses to simplify stream processing.
20+
21+
### Fixed
22+
23+
- Fixed replication stream recovery after PostgreSQL restart.
24+
- Fixed handling of idle COPY stream reads.
25+
- Fixed reconnect loops triggered by PostgreSQL replication timeouts.
26+
- Fixed E2E race conditions during PostgreSQL initialization.
27+
- Fixed replication slot creation behavior when a slot already exists.
28+
- Fixed several edge cases uncovered by Mammoth integration testing.
29+
30+
### Documentation
31+
32+
- Expanded YARD documentation coverage to 98.95%.
33+
534
## 0.2.3 - 2026-06-17
635

736
### Fixed

‎README.md‎

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,57 @@ Equivalent Rake task:
363363
bundle exec rake e2e:run
364364
```
365365

366+
## Transport lifecycle behavior
367+
368+
`pgoutput-client` owns PostgreSQL logical replication transport and lifecycle
369+
management. It opens the replication connection, optionally creates the logical
370+
replication slot, starts streaming, sends standby status feedback, and retries
371+
reconnectable failures.
372+
373+
### Idle standby feedback
374+
375+
Long-running replication streams can be quiet for long periods when no WAL
376+
changes are produced. During those idle periods the client wakes periodically
377+
and sends standby status feedback so PostgreSQL does not terminate the walsender
378+
for replication timeout.
379+
380+
Control the feedback cadence with `feedback_interval`:
381+
382+
```ruby
383+
runner = Pgoutput::Client::Runner.new(
384+
database_url: ENV.fetch("DATABASE_URL"),
385+
slot_name: "mammoth_live",
386+
publication_names: ["mammoth_publication"],
387+
feedback_interval: 10.0
388+
)
389+
```
390+
391+
### Idempotent automatic slot creation
392+
393+
When `auto_create_slot` is enabled, the client treats slot creation as
394+
"ensure this slot exists". Missing slots are created before streaming; existing
395+
slots are reused and do not cause startup failure.
396+
397+
```ruby
398+
runner = Pgoutput::Client::Runner.new(
399+
database_url: ENV.fetch("DATABASE_URL"),
400+
slot_name: "mammoth_live",
401+
publication_names: ["mammoth_publication"],
402+
auto_create_slot: true,
403+
temporary_slot: false
404+
)
405+
```
406+
407+
Publication creation remains outside this gem. Create publications through
408+
application migrations, database bootstrap SQL, or infrastructure tooling.
409+
410+
### Restart recovery
411+
412+
After a stream has connected successfully, transient PostgreSQL outages are
413+
retried through the reconnect lifecycle. This includes ordinary container or
414+
process restart windows where PostgreSQL temporarily refuses connections or
415+
reports that the database system is starting up.
416+
366417
---
367418

368419
## License

‎lib/pgoutput/client.rb‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
require_relative "client/stream"
1414
require_relative "client/runner"
1515

16+
# Namespace for PostgreSQL pgoutput logical replication components.
17+
#
18+
# The top-level namespace is shared by pgoutput ecosystem gems. This gem
19+
# defines only the `Pgoutput::Client` transport namespace and leaves protocol
20+
# parsing, value decoding, and CDC normalization to sibling libraries.
21+
#
22+
# @api public
1623
module Pgoutput
1724
# Namespace for PostgreSQL logical replication transport support.
1825
#

‎lib/pgoutput/client/connection.rb‎

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,19 @@ def start_replication
7676

7777
# Receive one CopyData payload from the server.
7878
#
79-
# The call is non-blocking because the underlying `pg` call receives
80-
# `false` for its blocking argument. `nil` means no complete CopyData
81-
# payload is currently available.
79+
# The stream must not block forever while PostgreSQL is idle, because the
80+
# caller needs opportunities to send periodic standby feedback. Wait
81+
# briefly for socket readability, then use the pg driver's blocking
82+
# CopyData read only when data is available. `nil` means the stream is
83+
# currently idle.
8284
#
8385
# @return [String, nil] raw CopyData payload or `nil`
8486
# @raise [ConnectionError] if receiving fails
8587
def get_copy_data # rubocop:disable Naming/AccessorMethodName
86-
@pg_connection.get_copy_data(false)
88+
return nil unless copy_data_readable?
89+
90+
copy_data = @pg_connection.get_copy_data(false)
91+
copy_data == false ? nil : copy_data
8792
rescue PG::Error => e
8893
raise ConnectionError, e.message
8994
end
@@ -110,6 +115,15 @@ def close
110115

111116
private
112117

118+
def copy_data_readable?
119+
return true unless @pg_connection.respond_to?(:socket_io)
120+
121+
socket = @pg_connection.socket_io
122+
return true unless socket
123+
124+
!!IO.select([socket], nil, nil, 0.1)
125+
end
126+
113127
def exec(sql)
114128
@pg_connection.exec(sql)
115129
rescue PG::Error => e

‎lib/pgoutput/client/feedback.rb‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
module Pgoutput
44
module Client
5+
# Internal immutable base class generated by `Data.define` for {Feedback}.
6+
#
7+
# @api private
58
FeedbackData = Data.define(:received_lsn, :flushed_lsn, :applied_lsn, :client_clock, :reply_requested)
69

710
# Standby status feedback message builder.

‎lib/pgoutput/client/keepalive.rb‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
module Pgoutput
44
module Client
5+
# Internal immutable base class generated by `Data.define` for {Keepalive}.
6+
#
7+
# @api private
58
KeepaliveData = Data.define(:wal_end, :server_clock, :reply_requested)
69

710
# Immutable primary keepalive replication message.

‎lib/pgoutput/client/runner.rb‎

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,17 @@ module Client
3737
# @see Stream
3838
# @api public
3939
class Runner
40+
# Default number of reconnect attempts after a previously healthy stream
41+
# fails. The default is intentionally large enough to survive ordinary
42+
# PostgreSQL restart windows.
43+
#
44+
# @return [Integer]
4045
DEFAULT_RECONNECT_ATTEMPTS = 30
46+
47+
# Base reconnect backoff, in seconds. Attempt `n` sleeps for
48+
# `n * DEFAULT_RECONNECT_BACKOFF`.
49+
#
50+
# @return [Float]
4151
DEFAULT_RECONNECT_BACKOFF = 0.5
4252

4353
# Configuration used by this runner.

‎lib/pgoutput/client/state.rb‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
module Pgoutput
44
module Client
5+
# Internal immutable base class generated by `Data.define` for {RunnerState}.
6+
#
7+
# @api private
58
RunnerStateData = Data.define(
69
:running,
710
:stop_requested,

‎lib/pgoutput/client/version.rb‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ module Client
55
# Current pgoutput-client gem version.
66
#
77
# @return [String]
8-
VERSION = "0.2.3"
8+
VERSION = "0.2.4"
99
end
1010
end

‎lib/pgoutput/client/xlog_data.rb‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
module Pgoutput
44
module Client
5+
# Internal immutable base class generated by `Data.define` for {XLogData}.
6+
#
7+
# @api private
58
ReplicationXLogData = Data.define(:wal_start, :wal_end, :server_clock, :payload)
69

710
# Immutable XLogData replication envelope.

0 commit comments

Comments
 (0)