From 962eeed1850de652ad25ede1df70aa40090643a3 Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 19:34:47 +0100 Subject: [PATCH 1/8] kvdb: add DeferrableBackend interface for postgres Add a DeferrableBackend interface that backends can optionally implement to indicate they prefer deferring heavy operations (like bulk deletes of revocation logs) to startup rather than executing them inline. This is needed for postgres backends where concurrent large transactions can cause lock contention and timeouts during normal operation. The implementation: - Adds DeferrableBackend interface with ShouldDeferHeavyOperations() - Adds helper function to check if a backend implements the interface - Implements the interface on sqlbase.db, returning true only for postgres (identified by "pgx" driver) and false for sqlite run go mod tidy --- go.mod | 3 +++ go.sum | 2 -- kvdb/interface.go | 25 +++++++++++++++++++++++++ kvdb/sqlbase/db.go | 10 ++++++++++ 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 7728da649ef..752fe63afa1 100644 --- a/go.mod +++ b/go.mod @@ -205,6 +205,9 @@ require ( // TODO(elle): remove once the gossip V2 sqldb changes have been made. replace github.com/lightningnetwork/lnd/sqldb => ./sqldb +// TODO: remove once the deferred cleanup changes have been released. +replace github.com/lightningnetwork/lnd/kvdb => ./kvdb + // This replace is for https://github.com/advisories/GHSA-25xm-hr59-7c27 replace github.com/ulikunitz/xz => github.com/ulikunitz/xz v0.5.11 diff --git a/go.sum b/go.sum index fed066fb800..954a1efbc36 100644 --- a/go.sum +++ b/go.sum @@ -378,8 +378,6 @@ github.com/lightningnetwork/lnd/fn/v2 v2.0.9 h1:ZytG4ltPac/sCyg1EJDn10RGzPIDJeye github.com/lightningnetwork/lnd/fn/v2 v2.0.9/go.mod h1:aPUJHJ31S+Lgoo8I5SxDIjnmeCifqujaiTXKZqpav3w= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= -github.com/lightningnetwork/lnd/kvdb v1.4.16 h1:9BZgWdDfjmHRHLS97cz39bVuBAqMc4/p3HX1xtUdbDI= -github.com/lightningnetwork/lnd/kvdb v1.4.16/go.mod h1:HW+bvwkxNaopkz3oIgBV6NEnV4jCEZCACFUcNg4xSjM= github.com/lightningnetwork/lnd/queue v1.1.1 h1:99ovBlpM9B0FRCGYJo6RSFDlt8/vOkQQZznVb18iNMI= github.com/lightningnetwork/lnd/queue v1.1.1/go.mod h1:7A6nC1Qrm32FHuhx/mi1cieAiBZo5O6l8IBIoQxvkz4= github.com/lightningnetwork/lnd/ticker v1.1.1 h1:J/b6N2hibFtC7JLV77ULQp++QLtCwT6ijJlbdiZFbSM= diff --git a/kvdb/interface.go b/kvdb/interface.go index 5fd1ecd16a9..d095b3b7caf 100644 --- a/kvdb/interface.go +++ b/kvdb/interface.go @@ -145,6 +145,31 @@ func RootBucket(t RTx) RBucket { return nil } +// DeferrableBackend is an optional interface that backends can implement to +// indicate they prefer deferring heavy operations (like bulk deletes) to +// startup rather than executing them inline. SQL-based backends typically +// implement this interface since concurrent large transactions can cause +// lock contention and timeouts. +type DeferrableBackend interface { + // ShouldDeferHeavyOperations returns true if the backend prefers to + // defer expensive operations (like deleting thousands of revocation + // log entries) to startup rather than executing them inline during + // normal operations. + ShouldDeferHeavyOperations() bool +} + +// ShouldDeferHeavyOperations checks if the backend implements +// DeferrableBackend and if so, returns the result of +// ShouldDeferHeavyOperations(). Returns false for backends that don't +// implement the interface (like bbolt). +func ShouldDeferHeavyOperations(backend Backend) bool { + if db, ok := backend.(DeferrableBackend); ok { + return db.ShouldDeferHeavyOperations() + } + + return false +} + var ( // ErrBucketNotFound is returned when trying to access a bucket that // has not been created yet. diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go index 8ff7f979aff..e0c13d7918b 100644 --- a/kvdb/sqlbase/db.go +++ b/kvdb/sqlbase/db.go @@ -95,6 +95,16 @@ type db struct { // Enforce db implements the walletdb.DB interface. var _ walletdb.DB = (*db)(nil) +// ShouldDeferHeavyOperations returns true for postgres backends, indicating +// they prefer deferring heavy operations (like bulk deletes of revocation logs) +// to startup. This helps avoid lock contention and transaction timeouts that +// can occur when multiple channels are closed concurrently. SQLite uses the +// same immediate cleanup approach as bbolt since it doesn't have the same +// concurrency constraints. +func (db *db) ShouldDeferHeavyOperations() bool { + return db.cfg.DriverName == "pgx" +} + var ( // dbConns is a global set of database connections. dbConns *dbConnSet From d982765dd66ba2535d5653bed42d71d06efa3e4d Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 19:35:03 +0100 Subject: [PATCH 2/8] channeldb: add deferred channel cleanup for postgres For postgres backends, defer heavy cleanup operations (deleting revocation logs, forwarding packages) to startup rather than performing them inline during CloseChannel. This avoids lock contention and transaction timeouts that can occur when multiple channels are closed concurrently. Changes: - Add pendingCleanupBucket to store channels awaiting cleanup - Add PendingCleanupInfo struct with Encode/Decode methods - Modify CloseChannel to check kvdb.ShouldDeferHeavyOperations(): - For postgres: store cleanup info and defer deletion to startup - For bbolt/sqlite: perform immediate cleanup as before - Add storePendingCleanup() and performImmediateCleanup() helpers --- channeldb/channel.go | 170 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 142 insertions(+), 28 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index d5764759455..81bd14788f3 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -78,6 +78,12 @@ var ( // is retained. historicalChannelBucket = []byte("historical-chan-bucket") + // pendingCleanupBucket stores information about channels that have been + // closed but whose data (revocation logs, forwarding packages) has not + // yet been deleted. This is used by SQL backends to defer heavy cleanup + // operations to startup. + pendingCleanupBucket = []byte("pending-cleanup-bucket") + // chanInfoKey can be accessed within the bucket for a channel // (identified by its chanPoint). This key stores all the static // information for a channel which is decided at the end of the @@ -3759,6 +3765,57 @@ const ( Abandoned ClosureType = 5 ) +// PendingCleanupInfo contains the information needed to clean up a channel's +// data after it has been closed. This is used by SQL backends to defer heavy +// deletion operations to startup. +type PendingCleanupInfo struct { + // ChanPoint is the funding outpoint of the channel. + ChanPoint wire.OutPoint + + // ShortChanID is the short channel ID of the channel. + ShortChanID lnwire.ShortChannelID + + // NodePub is the compressed public key of the remote node. + NodePub [33]byte + + // ChainHash is the hash of the chain this channel belongs to. + ChainHash chainhash.Hash +} + +// Encode serializes the PendingCleanupInfo to the given writer. +func (p *PendingCleanupInfo) Encode(w io.Writer) error { + if err := WriteElements(w, p.ChanPoint, p.ShortChanID); err != nil { + return err + } + + if _, err := w.Write(p.NodePub[:]); err != nil { + return err + } + + if _, err := w.Write(p.ChainHash[:]); err != nil { + return err + } + + return nil +} + +// Decode deserializes the PendingCleanupInfo from the given reader. +func (p *PendingCleanupInfo) Decode(r io.Reader) error { + if err := ReadElements(r, &p.ChanPoint, &p.ShortChanID); err != nil { + return err + } + + if _, err := io.ReadFull(r, p.NodePub[:]); err != nil { + return err + } + + if _, err := io.ReadFull(r, p.ChainHash[:]); err != nil { + return err + } + + return nil +} + // ChannelCloseSummary contains the final state of a channel at the point it // was closed. Once a channel is closed, all the information pertaining to that // channel within the openChannelBucket is deleted, and a compact summary is @@ -3853,6 +3910,10 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary, c.Lock() defer c.Unlock() + // Check if the backend prefers deferring heavy operations to startup. + // Postgres backends return true here to avoid lock contention. + deferCleanup := kvdb.ShouldDeferHeavyOperations(c.Db.backend) + return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error { openChanBucket := tx.ReadWriteBucket(openChannelBucket) if openChanBucket == nil { @@ -3893,37 +3954,25 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary, return err } - // Delete all the forwarding packages stored for this particular - // channel. - if err = chanState.Packager.Wipe(tx); err != nil { - return err - } - - // Now that the index to this channel has been deleted, purge - // the remaining channel metadata from the database. - err = deleteOpenChannel(chanBucket) - if err != nil { - return err - } - - // We'll also remove the channel from the frozen channel bucket - // if we need to. - if c.ChanType.IsFrozen() || c.ChanType.HasLeaseExpiration() { - err := deleteThawHeight(chanBucket) + if deferCleanup { + // For postgres backends, store cleanup info and defer + // the heavy deletion operations to startup. + err = storePendingCleanup( + tx, c, nodePub, chanKey, + ) + if err != nil { + return err + } + } else { + // For non-postgres backends (bbolt, sqlite), perform + // immediate cleanup. + err = performImmediateCleanup( + tx, chanState, chanBucket, chainBucket, + chanPointBuf.Bytes(), + ) if err != nil { return err } - } - - // With the base channel data deleted, attempt to delete the - // information stored within the revocation log. - if err := deleteLogBucket(chanBucket); err != nil { - return err - } - - err = chainBucket.DeleteNestedBucket(chanPointBuf.Bytes()) - if err != nil { - return err } // Fetch the outpoint bucket to see if the outpoint exists or @@ -4733,6 +4782,71 @@ func deleteOpenChannel(chanBucket kvdb.RwBucket) error { return nil } +// storePendingCleanup stores cleanup info for a channel to be processed at +// startup. This is used by postgres backends to defer heavy deletion +// operations. +func storePendingCleanup(tx kvdb.RwTx, c *OpenChannel, nodePub []byte, + chanKey []byte) error { + + cleanupBucket, err := tx.CreateTopLevelBucket(pendingCleanupBucket) + if err != nil { + return err + } + + var nodePubKey [33]byte + copy(nodePubKey[:], nodePub) + + cleanupInfo := &PendingCleanupInfo{ + ChanPoint: c.FundingOutpoint, + ShortChanID: c.ShortChannelID, + NodePub: nodePubKey, + ChainHash: c.ChainHash, + } + + var cleanupBuf bytes.Buffer + if err := cleanupInfo.Encode(&cleanupBuf); err != nil { + return err + } + + return cleanupBucket.Put(chanKey, cleanupBuf.Bytes()) +} + +// performImmediateCleanup handles the cleanup operations that are performed +// immediately during channel close for non-postgres backends (bbolt, sqlite). +// This includes wiping forwarding packages, deleting channel data, thaw height, +// revocation logs, and the channel bucket itself. +func performImmediateCleanup(tx kvdb.RwTx, chanState *OpenChannel, + chanBucket kvdb.RwBucket, chainBucket kvdb.RwBucket, + chanKey []byte) error { + + // Delete all the forwarding packages stored for this channel. + if err := chanState.Packager.Wipe(tx); err != nil { + return err + } + + // Purge the remaining channel metadata from the database. + if err := deleteOpenChannel(chanBucket); err != nil { + return err + } + + // Remove the channel from the frozen channel bucket if needed. + if chanState.ChanType.IsFrozen() || + chanState.ChanType.HasLeaseExpiration() { + + if err := deleteThawHeight(chanBucket); err != nil { + return err + } + } + + // Delete the information stored within the revocation log. + if err := deleteLogBucket(chanBucket); err != nil { + return err + } + + // Delete the channel bucket itself. + return chainBucket.DeleteNestedBucket(chanKey) +} + // makeLogKey converts a uint64 into an 8 byte array. func makeLogKey(updateNum uint64) [8]byte { var key [8]byte From 1baac42cb7de996d4da864b9edb1c19a7166ff8c Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 19:35:19 +0100 Subject: [PATCH 3/8] channeldb: add CleanupPendingCloses for startup cleanup Add CleanupPendingCloses() function to process channels that were closed but whose heavy cleanup was deferred to startup. This is used by postgres backends to avoid lock contention during normal operation. The function: - Reads all entries from pendingCleanupBucket - For each entry, performs the deferred cleanup operations: - Wipes forwarding packages - Deletes channel metadata - Removes thaw height if applicable - Deletes revocation logs - Removes the channel bucket - Removes the processed entry from pendingCleanupBucket For non-postgres backends, this function is a no-op as the bucket will be empty (they perform immediate cleanup). --- channeldb/db.go | 151 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 00b29f65f9f..c4a4609dbc5 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -2023,6 +2023,157 @@ func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome( }, func() {}) } +// CleanupPendingCloses processes any channels that were closed but whose heavy +// cleanup operations (deleting revocation logs, forwarding packages) were +// deferred to startup. This is used by postgres backends to avoid lock +// contention during normal operation. +func (c *ChannelStateDB) CleanupPendingCloses() error { + // First, collect all the pending cleanup entries. + var cleanupEntries []*PendingCleanupInfo + err := kvdb.View(c.backend, func(tx kvdb.RTx) error { + cleanupBucket := tx.ReadBucket(pendingCleanupBucket) + if cleanupBucket == nil { + return nil + } + + return cleanupBucket.ForEach(func(k, v []byte) error { + info := &PendingCleanupInfo{} + if err := info.Decode(bytes.NewReader(v)); err != nil { + return err + } + + cleanupEntries = append(cleanupEntries, info) + + return nil + }) + }, func() { + cleanupEntries = nil + }) + if err != nil { + return err + } + + if len(cleanupEntries) == 0 { + return nil + } + + log.Infof("Processing %d deferred channel cleanups", + len(cleanupEntries)) + + // Process each cleanup entry. + for _, info := range cleanupEntries { + err := c.cleanupChannel(info) + if err != nil { + log.Warnf("Failed to cleanup channel %v: %v", + info.ChanPoint, err) + continue + } + + log.Debugf("Cleaned up deferred channel data for %v", + info.ChanPoint) + } + + return nil +} + +// cleanupChannel performs the actual cleanup for a single channel. +func (c *ChannelStateDB) cleanupChannel(info *PendingCleanupInfo) error { + return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + // Get the open channel bucket structure. + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + if openChanBucket == nil { + // If there's no open channel bucket, nothing to clean. + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + nodeChanBucket := openChanBucket.NestedReadWriteBucket( + info.NodePub[:], + ) + if nodeChanBucket == nil { + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + chainBucket := nodeChanBucket.NestedReadWriteBucket( + info.ChainHash[:], + ) + if chainBucket == nil { + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + var chanPointBuf bytes.Buffer + err := graphdb.WriteOutpoint(&chanPointBuf, &info.ChanPoint) + if err != nil { + return err + } + chanKey := chanPointBuf.Bytes() + + chanBucket := chainBucket.NestedReadWriteBucket(chanKey) + if chanBucket == nil { + // Channel bucket doesn't exist, just remove the + // pending cleanup entry. + return c.removePendingCleanup(tx, &info.ChanPoint) + } + + // Fetch the channel state to get the packager. + chanState, err := fetchOpenChannel( + chanBucket, &info.ChanPoint, + ) + if err != nil { + return err + } + + // Delete all the forwarding packages stored for this channel. + if err := chanState.Packager.Wipe(tx); err != nil { + return err + } + + // Purge the remaining channel metadata from the database. + if err := deleteOpenChannel(chanBucket); err != nil { + return err + } + + // Remove the channel from the frozen channel bucket if needed. + if chanState.ChanType.IsFrozen() || + chanState.ChanType.HasLeaseExpiration() { + + if err := deleteThawHeight(chanBucket); err != nil { + return err + } + } + + // Delete the information stored within the revocation log. + if err := deleteLogBucket(chanBucket); err != nil { + return err + } + + // Delete the channel bucket itself. + if err := chainBucket.DeleteNestedBucket(chanKey); err != nil { + return err + } + + // Finally, remove the pending cleanup entry. + return c.removePendingCleanup(tx, &info.ChanPoint) + }, func() {}) +} + +// removePendingCleanup removes a channel's entry from the pending cleanup +// bucket. +func (c *ChannelStateDB) removePendingCleanup(tx kvdb.RwTx, + chanPoint *wire.OutPoint) error { + + cleanupBucket := tx.ReadWriteBucket(pendingCleanupBucket) + if cleanupBucket == nil { + return nil + } + + var chanPointBuf bytes.Buffer + if err := graphdb.WriteOutpoint(&chanPointBuf, chanPoint); err != nil { + return err + } + + return cleanupBucket.Delete(chanPointBuf.Bytes()) +} + // MakeTestInvoiceDB is used to create a test invoice database for testing // purposes. It simply calls into MakeTestDB so the same modifiers can be used. func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) ( From ab92f78dfef5217d0b2d3e2eebfdea00bd6292c2 Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 19:35:29 +0100 Subject: [PATCH 4/8] config: call CleanupPendingCloses at startup for postgres Call CleanupPendingCloses() during LND startup to process any channels that had their heavy cleanup deferred. This is only done for postgres backends (checked via kvdb.ShouldDeferHeavyOperations). For bbolt and sqlite backends, this check is skipped as they perform immediate cleanup during CloseChannel. --- config_builder.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/config_builder.go b/config_builder.go index 7ce63041ee2..bed1437ab4e 100644 --- a/config_builder.go +++ b/config_builder.go @@ -1094,11 +1094,28 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( return nil, nil, err } + // Process any deferred channel cleanups. For postgres backends, heavy + // cleanup operations (deleting revocation logs, forwarding packages) + // are deferred to startup to avoid lock contention. For other backends + // (bbolt, sqlite), this is a no-op as they perform immediate cleanup. + if kvdb.ShouldDeferHeavyOperations(databaseBackends.ChanStateDB) { + err = dbs.ChanStateDB.ChannelStateDB().CleanupPendingCloses() + if err != nil { + cleanUp() + + err = fmt.Errorf("unable to cleanup pending "+ + "closes: %w", err) + d.logger.Error(err) + return nil, nil, err + } + } + // The graph store implementation we will use depends on whether // native SQL is enabled or not. var graphStore graphdb.V1Store // Instantiate a native SQL store if the flag is set. + //nolint:nestif if d.cfg.DB.UseNativeSQL { migrations := sqldb.GetMigrations() From e451f8fabb89c3433af3ad8836d4d716d8826ff0 Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 19:38:50 +0100 Subject: [PATCH 5/8] channeldb: add unit tests for deferred cleanup Add unit tests for the deferred channel cleanup functionality: - TestPendingCleanupInfoEncodeDecode: Tests that PendingCleanupInfo can be properly encoded and decoded - TestCleanupPendingClosesEmpty: Tests that CleanupPendingCloses works correctly when there are no pending cleanups - TestImmediateCleanupOnClose: Tests that for non-postgres backends, channel close performs immediate cleanup without deferring --- channeldb/db_test.go | 105 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/channeldb/db_test.go b/channeldb/db_test.go index e2e9a197004..95c92b2320a 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -1,6 +1,7 @@ package channeldb import ( + "bytes" "image/color" "math" "math/rand" @@ -833,3 +834,107 @@ func createTestVertex(t *testing.T) *models.Node { return createNode(priv) } + +// TestPendingCleanupInfoEncodeDecode tests that PendingCleanupInfo can be +// properly encoded and decoded. +func TestPendingCleanupInfoEncodeDecode(t *testing.T) { + t.Parallel() + + // Create a test PendingCleanupInfo. + var nodePub [33]byte + copy(nodePub[:], bytes.Repeat([]byte{0x02}, 33)) + + chanPoint := wire.OutPoint{ + Hash: chainhash.Hash{0x01, 0x02, 0x03}, + Index: 42, + } + shortChanID := lnwire.NewShortChanIDFromInt(123456) + chainHash := chainhash.Hash{0x0a, 0x0b, 0x0c} + + info := &PendingCleanupInfo{ + ChanPoint: chanPoint, + ShortChanID: shortChanID, + NodePub: nodePub, + ChainHash: chainHash, + } + + // Encode it. + var buf bytes.Buffer + err := info.Encode(&buf) + require.NoError(t, err) + + // Decode it. + decoded := &PendingCleanupInfo{} + err = decoded.Decode(&buf) + require.NoError(t, err) + + // Verify all fields match. + require.Equal(t, info.ChanPoint, decoded.ChanPoint) + require.Equal(t, info.ShortChanID, decoded.ShortChanID) + require.Equal(t, info.NodePub, decoded.NodePub) + require.Equal(t, info.ChainHash, decoded.ChainHash) +} + +// TestCleanupPendingClosesEmpty tests that CleanupPendingCloses works +// correctly when there are no pending cleanups. +func TestCleanupPendingClosesEmpty(t *testing.T) { + t.Parallel() + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + + // Calling CleanupPendingCloses when there's nothing to clean should + // succeed without error. + err = cdb.CleanupPendingCloses() + require.NoError(t, err) +} + +// TestImmediateCleanupOnClose tests that for non-postgres backends (like +// bbolt and sqlite), channel close performs immediate cleanup without +// deferring to the pending cleanup bucket. +func TestImmediateCleanupOnClose(t *testing.T) { + t.Parallel() + + // Skip this test for postgres as it defers cleanup. + if kvdb.PostgresBackend { + t.Skip("Skipping test for postgres backend") + } + + fullDB, err := MakeTestDB(t) + require.NoError(t, err) + + cdb := fullDB.ChannelStateDB() + + // Create an open channel. + channel := createTestChannel(t, cdb, openChannelOption()) + + // Close the channel. + err = channel.CloseChannel(&ChannelCloseSummary{ + ChanPoint: channel.FundingOutpoint, + RemotePub: channel.IdentityPub, + SettledBalance: btcutil.Amount(500), + }) + require.NoError(t, err) + + // For non-postgres backends, the pending cleanup bucket should be + // empty (or not exist). + var pendingCleanupCount int + err = kvdb.View(fullDB.Backend, func(tx kvdb.RTx) error { + cleanupBucket := tx.ReadBucket(pendingCleanupBucket) + if cleanupBucket == nil { + return nil + } + + return cleanupBucket.ForEach(func(k, v []byte) error { + pendingCleanupCount++ + return nil + }) + }, func() { + pendingCleanupCount = 0 + }) + require.NoError(t, err) + require.Zero(t, pendingCleanupCount, + "expected no pending cleanup entries for non-postgres backend") +} From a566b9f6ad9e722554b6881b54cee1f7900d9468 Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 19:39:04 +0100 Subject: [PATCH 6/8] itest: split wipe forwarding packages test by backend Split the wipe forwarding packages integration test into backend-specific versions to account for the different cleanup behaviors: - For bbolt/sqlite (immediate cleanup): - Test remains in list_on_bbolt_test.go with build tag !kvdb_postgres - Forwarding packages are deleted immediately during CloseChannel - No node restart needed to verify cleanup - For postgres (deferred cleanup): - New test in lnd_wipe_fwdpkgs_sql_test.go with build tag kvdb_postgres - Forwarding packages are NOT deleted during CloseChannel - Node restart triggers CleanupPendingCloses() which performs cleanup - Test verifies cleanup after restart Also updates list_exclude_test.go to include both test variants in the Windows exclusion list. --- itest/list_on_bbolt_test.go | 19 ++++++ itest/list_on_sql_test.go | 18 +++++ itest/list_on_test.go | 4 -- itest/lnd_wipe_fwdpkgs_sql_test.go | 103 +++++++++++++++++++++++++++++ 4 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 itest/list_on_bbolt_test.go create mode 100644 itest/list_on_sql_test.go create mode 100644 itest/lnd_wipe_fwdpkgs_sql_test.go diff --git a/itest/list_on_bbolt_test.go b/itest/list_on_bbolt_test.go new file mode 100644 index 00000000000..b9ac114c037 --- /dev/null +++ b/itest/list_on_bbolt_test.go @@ -0,0 +1,19 @@ +//go:build integration && !kvdb_postgres + +package itest + +import "github.com/lightningnetwork/lnd/lntest" + +// immediateCleanupTestCases is a list of tests that are only run when using +// bbolt or sqlite backends. These backends perform immediate cleanup during +// channel close, unlike postgres which defers cleanup to startup. +var immediateCleanupTestCases = []*lntest.TestCase{ + { + Name: "wipe forwarding packages", + TestFunc: testWipeForwardingPackages, + }, +} + +func init() { + allTestCases = append(allTestCases, immediateCleanupTestCases...) +} diff --git a/itest/list_on_sql_test.go b/itest/list_on_sql_test.go new file mode 100644 index 00000000000..598969cb577 --- /dev/null +++ b/itest/list_on_sql_test.go @@ -0,0 +1,18 @@ +//go:build integration && kvdb_postgres + +package itest + +import "github.com/lightningnetwork/lnd/lntest" + +// postgresTestCases is a list of tests that are only run when using postgres +// backend. These tests verify postgres-specific behavior like deferred cleanup. +var postgresTestCases = []*lntest.TestCase{ + { + Name: "wipe forwarding packages", + TestFunc: testWipeForwardingPackagesPostgres, + }, +} + +func init() { + allTestCases = append(allTestCases, postgresTestCases...) +} diff --git a/itest/list_on_test.go b/itest/list_on_test.go index f5961147d3a..530f341794f 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -383,10 +383,6 @@ var allTestCases = []*lntest.TestCase{ Name: "single hop invoice", TestFunc: testSingleHopInvoice, }, - { - Name: "wipe forwarding packages", - TestFunc: testWipeForwardingPackages, - }, { Name: "switch circuit persistence", TestFunc: testSwitchCircuitPersistence, diff --git a/itest/lnd_wipe_fwdpkgs_sql_test.go b/itest/lnd_wipe_fwdpkgs_sql_test.go new file mode 100644 index 00000000000..260cb3b2a52 --- /dev/null +++ b/itest/lnd_wipe_fwdpkgs_sql_test.go @@ -0,0 +1,103 @@ +//go:build integration && kvdb_postgres + +package itest + +import ( + "github.com/lightningnetwork/lnd/chainreg" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntest" + "github.com/stretchr/testify/require" +) + +// testWipeForwardingPackagesPostgres tests that for postgres backends, +// forwarding packages are deleted when CleanupPendingCloses runs at startup +// (not during CloseChannel like bbolt/sqlite). This test verifies the deferred +// cleanup behavior that avoids lock contention during normal operation. +func testWipeForwardingPackagesPostgres(ht *lntest.HarnessTest) { + const ( + chanAmt = 10e6 + paymentAmt = 10e4 + finalCTLVDelta = chainreg.DefaultBitcoinTimeLockDelta + numInvoices = 3 + ) + + chanPoints, nodes := ht.CreateSimpleNetwork( + [][]string{nil, nil, nil}, + lntest.OpenChannelParams{Amt: chanAmt}, + ) + chanPointAB, chanPointBC := chanPoints[0], chanPoints[1] + alice, bob, carol := nodes[0], nodes[1], nodes[2] + + // Before we continue, make sure Alice has seen the channel between Bob + // and Carol. + ht.AssertChannelInGraph(alice, chanPointBC) + + // Alice sends several payments to Carol through Bob, which triggers + // Bob to create forwarding packages. + for i := 0; i < numInvoices; i++ { + // Add an invoice for Carol. + invoice := &lnrpc.Invoice{Memo: "testing", Value: paymentAmt} + resp := carol.RPC.AddInvoice(invoice) + + // Alice sends a payment to Carol through Bob. + ht.CompletePaymentRequests(alice, []string{resp.PaymentRequest}) + } + + flakePaymentStreamReturnEarly() + + // Firstly, Bob force closes the channel. + ht.CloseChannelAssertPending(bob, chanPointAB, true) + + // Now that the channel has been force closed, it should show up in + // bob's PendingChannels RPC under the waiting close section. + pendingAB := ht.AssertChannelWaitingClose(bob, chanPointAB).Channel + + // Check that Bob has created forwarding packages. We don't care the + // exact number here as long as these packages are deleted when the + // channel is closed. + require.NotZero(ht, pendingAB.NumForwardingPackages) + + // Secondly, Bob coop closes the channel. + ht.CloseChannelAssertPending(bob, chanPointBC, false) + + // Now that the channel has been coop closed, it should show up in + // bob's PendingChannels RPC under the waiting close section. + pendingBC := ht.AssertChannelWaitingClose(bob, chanPointBC).Channel + + // Check that Bob has created forwarding packages. We don't care the + // exact number here as long as these packages are deleted when the + // channel is closed. + require.NotZero(ht, pendingBC.NumForwardingPackages) + + // Since it's a coop close, Carol should see the waiting close channel + // too. + pendingBC = ht.AssertChannelWaitingClose(carol, chanPointBC).Channel + require.NotZero(ht, pendingBC.NumForwardingPackages) + + // Mine 1 block to get the two closing transactions confirmed. + ht.MineBlocksAndAssertNumTxes(1, 2) + + // For SQL backends, the forwarding packages are NOT deleted during + // CloseChannel - they are deferred to CleanupPendingCloses at startup. + // Restart the nodes to trigger the cleanup. + ht.RestartNode(bob) + ht.RestartNode(alice) + + // Now that the closing transaction is confirmed, the above waiting + // close channel should now become pending force closed channel. + pendingAB = ht.AssertChannelPendingForceClose(bob, chanPointAB).Channel + + // Check the forwarding packages are deleted after the restart. + require.Zero(ht, pendingAB.NumForwardingPackages) + + // For Alice, the forwarding packages should have been wiped too. + pending := ht.AssertChannelPendingForceClose(alice, chanPointAB) + pendingAB = pending.Channel + require.Zero(ht, pendingAB.NumForwardingPackages) + + // Alice should one pending sweep. + ht.AssertNumPendingSweeps(alice, 1) + + // Mine 1 block to get Alice's sweeping tx confirmed. + ht.MineBlocksAndAssertNumTxes(1, 1) +} From 768af02cd73731a745545d6e2548f7a183357841 Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 19:42:19 +0100 Subject: [PATCH 7/8] channeldb: fix tests to handle deferred cleanup for postgres Update tests that close channels and immediately check for deletion to handle the deferred cleanup behavior for postgres backends: - TestAbandonChannel - TestOpenChannelPutGetDelete - TestChannelStateTransition For postgres backends (where ShouldDeferHeavyOperations returns true), call CleanupPendingCloses() after closing channels to process the deferred cleanup before asserting that channels are deleted. --- channeldb/channel_test.go | 12 ++++++++++++ channeldb/db_test.go | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 47504067780..70e942da136 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -509,6 +509,12 @@ func TestOpenChannelPutGetDelete(t *testing.T) { t.Fatalf("unable to close channel: %v", err) } + // For postgres backends, cleanup is deferred. Process it now. + if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) { + err = cdb.CleanupPendingCloses() + require.NoError(t, err, "unable to cleanup pending closes") + } + // As the channel is now closed, attempting to fetch all open channels // for our fake node ID should return an empty slice. openChans, err := cdb.FetchOpenChannels(state.IdentityPub) @@ -955,6 +961,12 @@ func TestChannelStateTransition(t *testing.T) { t.Fatalf("unable to delete updated channel: %v", err) } + // For postgres backends, cleanup is deferred. Process it now. + if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) { + err = cdb.CleanupPendingCloses() + require.NoError(t, err, "unable to cleanup pending closes") + } + // If we attempt to fetch the target channel again, it shouldn't be // found. channels, err := cdb.FetchOpenChannels(channel.IdentityPub) diff --git a/channeldb/db_test.go b/channeldb/db_test.go index 95c92b2320a..c7802cf5594 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -456,6 +456,12 @@ func TestAbandonChannel(t *testing.T) { err = cdb.AbandonChannel(&chanState.FundingOutpoint, closeHeight) require.NoError(t, err, "unable to abandon channel") + // For postgres backends, cleanup is deferred. Process it now. + if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) { + err = cdb.CleanupPendingCloses() + require.NoError(t, err, "unable to cleanup pending closes") + } + // At this point, the channel should no longer be found in the set of // open channels. _, err = cdb.FetchChannel(chanState.FundingOutpoint) From bf85a3dc9140d01d6cfae4e6a04536944edb0fac Mon Sep 17 00:00:00 2001 From: ziggie Date: Sun, 23 Nov 2025 20:49:13 +0100 Subject: [PATCH 8/8] docs: add release-notes for LND 20.1 --- docs/release-notes/release-notes-0.20.1.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/release-notes/release-notes-0.20.1.md b/docs/release-notes/release-notes-0.20.1.md index 1963db61b45..c84e32893ad 100644 --- a/docs/release-notes/release-notes-0.20.1.md +++ b/docs/release-notes/release-notes-0.20.1.md @@ -43,6 +43,15 @@ ## Performance Improvements +* [Defer deletion of closed + channel data](https://github.com/lightningnetwork/lnd/pull/10390) to the + next restart for postgres backends. Normally deleting everything as soon as + the channel is closed is ok, but for the kvdb backend for postgres this can + cause severe stress on the kv table, so we introduce this performance + improvment until the native sql schema for channels and revocation logs are + deployed. + + ## Deprecations # Technical and Architectural Updates