Skip to content

Commit 4927490

Browse files
committed
preliminary commit
need to split commits
1 parent 8c8662c commit 4927490

File tree

11 files changed

+593
-32
lines changed

11 files changed

+593
-32
lines changed

channeldb/channel.go

Lines changed: 142 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ var (
7878
// is retained.
7979
historicalChannelBucket = []byte("historical-chan-bucket")
8080

81+
// pendingCleanupBucket stores information about channels that have been
82+
// closed but whose data (revocation logs, forwarding packages) has not
83+
// yet been deleted. This is used by SQL backends to defer heavy cleanup
84+
// operations to startup.
85+
pendingCleanupBucket = []byte("pending-cleanup-bucket")
86+
8187
// chanInfoKey can be accessed within the bucket for a channel
8288
// (identified by its chanPoint). This key stores all the static
8389
// information for a channel which is decided at the end of the
@@ -3759,6 +3765,57 @@ const (
37593765
Abandoned ClosureType = 5
37603766
)
37613767

3768+
// PendingCleanupInfo contains the information needed to clean up a channel's
3769+
// data after it has been closed. This is used by SQL backends to defer heavy
3770+
// deletion operations to startup.
3771+
type PendingCleanupInfo struct {
3772+
// ChanPoint is the funding outpoint of the channel.
3773+
ChanPoint wire.OutPoint
3774+
3775+
// ShortChanID is the short channel ID of the channel.
3776+
ShortChanID lnwire.ShortChannelID
3777+
3778+
// NodePub is the compressed public key of the remote node.
3779+
NodePub [33]byte
3780+
3781+
// ChainHash is the hash of the chain this channel belongs to.
3782+
ChainHash chainhash.Hash
3783+
}
3784+
3785+
// Encode serializes the PendingCleanupInfo to the given writer.
3786+
func (p *PendingCleanupInfo) Encode(w io.Writer) error {
3787+
if err := WriteElements(w, p.ChanPoint, p.ShortChanID); err != nil {
3788+
return err
3789+
}
3790+
3791+
if _, err := w.Write(p.NodePub[:]); err != nil {
3792+
return err
3793+
}
3794+
3795+
if _, err := w.Write(p.ChainHash[:]); err != nil {
3796+
return err
3797+
}
3798+
3799+
return nil
3800+
}
3801+
3802+
// Decode deserializes the PendingCleanupInfo from the given reader.
3803+
func (p *PendingCleanupInfo) Decode(r io.Reader) error {
3804+
if err := ReadElements(r, &p.ChanPoint, &p.ShortChanID); err != nil {
3805+
return err
3806+
}
3807+
3808+
if _, err := io.ReadFull(r, p.NodePub[:]); err != nil {
3809+
return err
3810+
}
3811+
3812+
if _, err := io.ReadFull(r, p.ChainHash[:]); err != nil {
3813+
return err
3814+
}
3815+
3816+
return nil
3817+
}
3818+
37623819
// ChannelCloseSummary contains the final state of a channel at the point it
37633820
// was closed. Once a channel is closed, all the information pertaining to that
37643821
// channel within the openChannelBucket is deleted, and a compact summary is
@@ -3853,6 +3910,10 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
38533910
c.Lock()
38543911
defer c.Unlock()
38553912

3913+
// Check if the backend prefers deferring heavy operations to startup.
3914+
// Postgres backends return true here to avoid lock contention.
3915+
deferCleanup := kvdb.ShouldDeferHeavyOperations(c.Db.backend)
3916+
38563917
return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error {
38573918
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
38583919
if openChanBucket == nil {
@@ -3893,37 +3954,25 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
38933954
return err
38943955
}
38953956

3896-
// Delete all the forwarding packages stored for this particular
3897-
// channel.
3898-
if err = chanState.Packager.Wipe(tx); err != nil {
3899-
return err
3900-
}
3901-
3902-
// Now that the index to this channel has been deleted, purge
3903-
// the remaining channel metadata from the database.
3904-
err = deleteOpenChannel(chanBucket)
3905-
if err != nil {
3906-
return err
3907-
}
3908-
3909-
// We'll also remove the channel from the frozen channel bucket
3910-
// if we need to.
3911-
if c.ChanType.IsFrozen() || c.ChanType.HasLeaseExpiration() {
3912-
err := deleteThawHeight(chanBucket)
3957+
if deferCleanup {
3958+
// For postgres backends, store cleanup info and defer
3959+
// the heavy deletion operations to startup.
3960+
err = storePendingCleanup(
3961+
tx, c, nodePub, chanKey,
3962+
)
3963+
if err != nil {
3964+
return err
3965+
}
3966+
} else {
3967+
// For non-postgres backends (bbolt, sqlite), perform
3968+
// immediate cleanup.
3969+
err = performImmediateCleanup(
3970+
tx, chanState, chanBucket, chainBucket,
3971+
chanPointBuf.Bytes(),
3972+
)
39133973
if err != nil {
39143974
return err
39153975
}
3916-
}
3917-
3918-
// With the base channel data deleted, attempt to delete the
3919-
// information stored within the revocation log.
3920-
if err := deleteLogBucket(chanBucket); err != nil {
3921-
return err
3922-
}
3923-
3924-
err = chainBucket.DeleteNestedBucket(chanPointBuf.Bytes())
3925-
if err != nil {
3926-
return err
39273976
}
39283977

39293978
// Fetch the outpoint bucket to see if the outpoint exists or
@@ -4733,6 +4782,71 @@ func deleteOpenChannel(chanBucket kvdb.RwBucket) error {
47334782
return nil
47344783
}
47354784

4785+
// storePendingCleanup stores cleanup info for a channel to be processed at
4786+
// startup. This is used by postgres backends to defer heavy deletion
4787+
// operations.
4788+
func storePendingCleanup(tx kvdb.RwTx, c *OpenChannel, nodePub []byte,
4789+
chanKey []byte) error {
4790+
4791+
cleanupBucket, err := tx.CreateTopLevelBucket(pendingCleanupBucket)
4792+
if err != nil {
4793+
return err
4794+
}
4795+
4796+
var nodePubKey [33]byte
4797+
copy(nodePubKey[:], nodePub)
4798+
4799+
cleanupInfo := &PendingCleanupInfo{
4800+
ChanPoint: c.FundingOutpoint,
4801+
ShortChanID: c.ShortChannelID,
4802+
NodePub: nodePubKey,
4803+
ChainHash: c.ChainHash,
4804+
}
4805+
4806+
var cleanupBuf bytes.Buffer
4807+
if err := cleanupInfo.Encode(&cleanupBuf); err != nil {
4808+
return err
4809+
}
4810+
4811+
return cleanupBucket.Put(chanKey, cleanupBuf.Bytes())
4812+
}
4813+
4814+
// performImmediateCleanup handles the cleanup operations that are performed
4815+
// immediately during channel close for non-postgres backends (bbolt, sqlite).
4816+
// This includes wiping forwarding packages, deleting channel data, thaw height,
4817+
// revocation logs, and the channel bucket itself.
4818+
func performImmediateCleanup(tx kvdb.RwTx, chanState *OpenChannel,
4819+
chanBucket kvdb.RwBucket, chainBucket kvdb.RwBucket,
4820+
chanKey []byte) error {
4821+
4822+
// Delete all the forwarding packages stored for this channel.
4823+
if err := chanState.Packager.Wipe(tx); err != nil {
4824+
return err
4825+
}
4826+
4827+
// Purge the remaining channel metadata from the database.
4828+
if err := deleteOpenChannel(chanBucket); err != nil {
4829+
return err
4830+
}
4831+
4832+
// Remove the channel from the frozen channel bucket if needed.
4833+
if chanState.ChanType.IsFrozen() ||
4834+
chanState.ChanType.HasLeaseExpiration() {
4835+
4836+
if err := deleteThawHeight(chanBucket); err != nil {
4837+
return err
4838+
}
4839+
}
4840+
4841+
// Delete the information stored within the revocation log.
4842+
if err := deleteLogBucket(chanBucket); err != nil {
4843+
return err
4844+
}
4845+
4846+
// Delete the channel bucket itself.
4847+
return chainBucket.DeleteNestedBucket(chanKey)
4848+
}
4849+
47364850
// makeLogKey converts a uint64 into an 8 byte array.
47374851
func makeLogKey(updateNum uint64) [8]byte {
47384852
var key [8]byte

channeldb/db.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2023,6 +2023,157 @@ func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
20232023
}, func() {})
20242024
}
20252025

2026+
// CleanupPendingCloses processes any channels that were closed but whose heavy
2027+
// cleanup operations (deleting revocation logs, forwarding packages) were
2028+
// deferred to startup. This is used by postgres backends to avoid lock
2029+
// contention during normal operation.
2030+
func (c *ChannelStateDB) CleanupPendingCloses() error {
2031+
// First, collect all the pending cleanup entries.
2032+
var cleanupEntries []*PendingCleanupInfo
2033+
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
2034+
cleanupBucket := tx.ReadBucket(pendingCleanupBucket)
2035+
if cleanupBucket == nil {
2036+
return nil
2037+
}
2038+
2039+
return cleanupBucket.ForEach(func(k, v []byte) error {
2040+
info := &PendingCleanupInfo{}
2041+
if err := info.Decode(bytes.NewReader(v)); err != nil {
2042+
return err
2043+
}
2044+
2045+
cleanupEntries = append(cleanupEntries, info)
2046+
2047+
return nil
2048+
})
2049+
}, func() {
2050+
cleanupEntries = nil
2051+
})
2052+
if err != nil {
2053+
return err
2054+
}
2055+
2056+
if len(cleanupEntries) == 0 {
2057+
return nil
2058+
}
2059+
2060+
log.Infof("Processing %d deferred channel cleanups",
2061+
len(cleanupEntries))
2062+
2063+
// Process each cleanup entry.
2064+
for _, info := range cleanupEntries {
2065+
err := c.cleanupChannel(info)
2066+
if err != nil {
2067+
log.Warnf("Failed to cleanup channel %v: %v",
2068+
info.ChanPoint, err)
2069+
continue
2070+
}
2071+
2072+
log.Debugf("Cleaned up deferred channel data for %v",
2073+
info.ChanPoint)
2074+
}
2075+
2076+
return nil
2077+
}
2078+
2079+
// cleanupChannel performs the actual cleanup for a single channel.
2080+
func (c *ChannelStateDB) cleanupChannel(info *PendingCleanupInfo) error {
2081+
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
2082+
// Get the open channel bucket structure.
2083+
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
2084+
if openChanBucket == nil {
2085+
// If there's no open channel bucket, nothing to clean.
2086+
return c.removePendingCleanup(tx, &info.ChanPoint)
2087+
}
2088+
2089+
nodeChanBucket := openChanBucket.NestedReadWriteBucket(
2090+
info.NodePub[:],
2091+
)
2092+
if nodeChanBucket == nil {
2093+
return c.removePendingCleanup(tx, &info.ChanPoint)
2094+
}
2095+
2096+
chainBucket := nodeChanBucket.NestedReadWriteBucket(
2097+
info.ChainHash[:],
2098+
)
2099+
if chainBucket == nil {
2100+
return c.removePendingCleanup(tx, &info.ChanPoint)
2101+
}
2102+
2103+
var chanPointBuf bytes.Buffer
2104+
err := graphdb.WriteOutpoint(&chanPointBuf, &info.ChanPoint)
2105+
if err != nil {
2106+
return err
2107+
}
2108+
chanKey := chanPointBuf.Bytes()
2109+
2110+
chanBucket := chainBucket.NestedReadWriteBucket(chanKey)
2111+
if chanBucket == nil {
2112+
// Channel bucket doesn't exist, just remove the
2113+
// pending cleanup entry.
2114+
return c.removePendingCleanup(tx, &info.ChanPoint)
2115+
}
2116+
2117+
// Fetch the channel state to get the packager.
2118+
chanState, err := fetchOpenChannel(
2119+
chanBucket, &info.ChanPoint,
2120+
)
2121+
if err != nil {
2122+
return err
2123+
}
2124+
2125+
// Delete all the forwarding packages stored for this channel.
2126+
if err := chanState.Packager.Wipe(tx); err != nil {
2127+
return err
2128+
}
2129+
2130+
// Purge the remaining channel metadata from the database.
2131+
if err := deleteOpenChannel(chanBucket); err != nil {
2132+
return err
2133+
}
2134+
2135+
// Remove the channel from the frozen channel bucket if needed.
2136+
if chanState.ChanType.IsFrozen() ||
2137+
chanState.ChanType.HasLeaseExpiration() {
2138+
2139+
if err := deleteThawHeight(chanBucket); err != nil {
2140+
return err
2141+
}
2142+
}
2143+
2144+
// Delete the information stored within the revocation log.
2145+
if err := deleteLogBucket(chanBucket); err != nil {
2146+
return err
2147+
}
2148+
2149+
// Delete the channel bucket itself.
2150+
if err := chainBucket.DeleteNestedBucket(chanKey); err != nil {
2151+
return err
2152+
}
2153+
2154+
// Finally, remove the pending cleanup entry.
2155+
return c.removePendingCleanup(tx, &info.ChanPoint)
2156+
}, func() {})
2157+
}
2158+
2159+
// removePendingCleanup removes a channel's entry from the pending cleanup
2160+
// bucket.
2161+
func (c *ChannelStateDB) removePendingCleanup(tx kvdb.RwTx,
2162+
chanPoint *wire.OutPoint) error {
2163+
2164+
cleanupBucket := tx.ReadWriteBucket(pendingCleanupBucket)
2165+
if cleanupBucket == nil {
2166+
return nil
2167+
}
2168+
2169+
var chanPointBuf bytes.Buffer
2170+
if err := graphdb.WriteOutpoint(&chanPointBuf, chanPoint); err != nil {
2171+
return err
2172+
}
2173+
2174+
return cleanupBucket.Delete(chanPointBuf.Bytes())
2175+
}
2176+
20262177
// MakeTestInvoiceDB is used to create a test invoice database for testing
20272178
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
20282179
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (

0 commit comments

Comments
 (0)