Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 142 additions & 28 deletions channeldb/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ var (
// is retained.
historicalChannelBucket = []byte("historical-chan-bucket")

// pendingCleanupBucket stores information about channels that have been
// closed but whose data (revocation logs, forwarding packages) has not
// yet been deleted. This is used by SQL backends to defer heavy cleanup
// operations to startup.
pendingCleanupBucket = []byte("pending-cleanup-bucket")

// chanInfoKey can be accessed within the bucket for a channel
// (identified by its chanPoint). This key stores all the static
// information for a channel which is decided at the end of the
Expand Down Expand Up @@ -3759,6 +3765,57 @@ const (
Abandoned ClosureType = 5
)

// PendingCleanupInfo contains the information needed to clean up a channel's
// data after it has been closed. This is used by SQL backends to defer heavy
// deletion operations to startup.
type PendingCleanupInfo struct {
// ChanPoint is the funding outpoint of the channel.
ChanPoint wire.OutPoint

// ShortChanID is the short channel ID of the channel.
ShortChanID lnwire.ShortChannelID

// NodePub is the compressed public key of the remote node.
NodePub [33]byte

// ChainHash is the hash of the chain this channel belongs to.
ChainHash chainhash.Hash
}

// Encode serializes the PendingCleanupInfo to the given writer.
func (p *PendingCleanupInfo) Encode(w io.Writer) error {
if err := WriteElements(w, p.ChanPoint, p.ShortChanID); err != nil {
return err
}

if _, err := w.Write(p.NodePub[:]); err != nil {
return err
}

if _, err := w.Write(p.ChainHash[:]); err != nil {
return err
}

return nil
}

// Decode deserializes the PendingCleanupInfo from the given reader.
func (p *PendingCleanupInfo) Decode(r io.Reader) error {
if err := ReadElements(r, &p.ChanPoint, &p.ShortChanID); err != nil {
return err
}

if _, err := io.ReadFull(r, p.NodePub[:]); err != nil {
return err
}

if _, err := io.ReadFull(r, p.ChainHash[:]); err != nil {
return err
}

return nil
}

// ChannelCloseSummary contains the final state of a channel at the point it
// was closed. Once a channel is closed, all the information pertaining to that
// channel within the openChannelBucket is deleted, and a compact summary is
Expand Down Expand Up @@ -3853,6 +3910,10 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
c.Lock()
defer c.Unlock()

// Check if the backend prefers deferring heavy operations to startup.
// Postgres backends return true here to avoid lock contention.
deferCleanup := kvdb.ShouldDeferHeavyOperations(c.Db.backend)

return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error {
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
if openChanBucket == nil {
Expand Down Expand Up @@ -3893,37 +3954,25 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary,
return err
}

// Delete all the forwarding packages stored for this particular
// channel.
if err = chanState.Packager.Wipe(tx); err != nil {
return err
}

// Now that the index to this channel has been deleted, purge
// the remaining channel metadata from the database.
err = deleteOpenChannel(chanBucket)
if err != nil {
return err
}

// We'll also remove the channel from the frozen channel bucket
// if we need to.
if c.ChanType.IsFrozen() || c.ChanType.HasLeaseExpiration() {
err := deleteThawHeight(chanBucket)
if deferCleanup {
// For postgres backends, store cleanup info and defer
// the heavy deletion operations to startup.
err = storePendingCleanup(
tx, c, nodePub, chanKey,
)
if err != nil {
return err
}
} else {
// For non-postgres backends (bbolt, sqlite), perform
// immediate cleanup.
err = performImmediateCleanup(
tx, chanState, chanBucket, chainBucket,
chanPointBuf.Bytes(),
)
if err != nil {
return err
}
}

// With the base channel data deleted, attempt to delete the
// information stored within the revocation log.
if err := deleteLogBucket(chanBucket); err != nil {
return err
}

err = chainBucket.DeleteNestedBucket(chanPointBuf.Bytes())
if err != nil {
return err
}

// Fetch the outpoint bucket to see if the outpoint exists or
Expand Down Expand Up @@ -4733,6 +4782,71 @@ func deleteOpenChannel(chanBucket kvdb.RwBucket) error {
return nil
}

// storePendingCleanup stores cleanup info for a channel to be processed at
// startup. This is used by postgres backends to defer heavy deletion
// operations.
func storePendingCleanup(tx kvdb.RwTx, c *OpenChannel, nodePub []byte,
chanKey []byte) error {

cleanupBucket, err := tx.CreateTopLevelBucket(pendingCleanupBucket)
if err != nil {
return err
}

var nodePubKey [33]byte
copy(nodePubKey[:], nodePub)

cleanupInfo := &PendingCleanupInfo{
ChanPoint: c.FundingOutpoint,
ShortChanID: c.ShortChannelID,
NodePub: nodePubKey,
ChainHash: c.ChainHash,
}

var cleanupBuf bytes.Buffer
if err := cleanupInfo.Encode(&cleanupBuf); err != nil {
return err
}

return cleanupBucket.Put(chanKey, cleanupBuf.Bytes())
}

// performImmediateCleanup handles the cleanup operations that are performed
// immediately during channel close for non-postgres backends (bbolt, sqlite).
// This includes wiping forwarding packages, deleting channel data, thaw height,
// revocation logs, and the channel bucket itself.
func performImmediateCleanup(tx kvdb.RwTx, chanState *OpenChannel,
chanBucket kvdb.RwBucket, chainBucket kvdb.RwBucket,
chanKey []byte) error {

// Delete all the forwarding packages stored for this channel.
if err := chanState.Packager.Wipe(tx); err != nil {
return err
}

// Purge the remaining channel metadata from the database.
if err := deleteOpenChannel(chanBucket); err != nil {
return err
}

// Remove the channel from the frozen channel bucket if needed.
if chanState.ChanType.IsFrozen() ||
chanState.ChanType.HasLeaseExpiration() {

if err := deleteThawHeight(chanBucket); err != nil {
return err
}
}

// Delete the information stored within the revocation log.
if err := deleteLogBucket(chanBucket); err != nil {
return err
}

// Delete the channel bucket itself.
return chainBucket.DeleteNestedBucket(chanKey)
}

// makeLogKey converts a uint64 into an 8 byte array.
func makeLogKey(updateNum uint64) [8]byte {
var key [8]byte
Expand Down
12 changes: 12 additions & 0 deletions channeldb/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,12 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
t.Fatalf("unable to close channel: %v", err)
}

// For postgres backends, cleanup is deferred. Process it now.
if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) {
err = cdb.CleanupPendingCloses()
require.NoError(t, err, "unable to cleanup pending closes")
}

// As the channel is now closed, attempting to fetch all open channels
// for our fake node ID should return an empty slice.
openChans, err := cdb.FetchOpenChannels(state.IdentityPub)
Expand Down Expand Up @@ -955,6 +961,12 @@ func TestChannelStateTransition(t *testing.T) {
t.Fatalf("unable to delete updated channel: %v", err)
}

// For postgres backends, cleanup is deferred. Process it now.
if kvdb.ShouldDeferHeavyOperations(fullDB.Backend) {
err = cdb.CleanupPendingCloses()
require.NoError(t, err, "unable to cleanup pending closes")
}

// If we attempt to fetch the target channel again, it shouldn't be
// found.
channels, err := cdb.FetchOpenChannels(channel.IdentityPub)
Expand Down
151 changes: 151 additions & 0 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,157 @@ func (c *ChannelStateDB) PutOnchainFinalHtlcOutcome(
}, func() {})
}

// CleanupPendingCloses processes any channels that were closed but whose heavy
// cleanup operations (deleting revocation logs, forwarding packages) were
// deferred to startup. This is used by postgres backends to avoid lock
// contention during normal operation.
func (c *ChannelStateDB) CleanupPendingCloses() error {
// First, collect all the pending cleanup entries.
var cleanupEntries []*PendingCleanupInfo
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
cleanupBucket := tx.ReadBucket(pendingCleanupBucket)
if cleanupBucket == nil {
return nil
}

return cleanupBucket.ForEach(func(k, v []byte) error {
info := &PendingCleanupInfo{}
if err := info.Decode(bytes.NewReader(v)); err != nil {
return err
}

cleanupEntries = append(cleanupEntries, info)

return nil
})
}, func() {
cleanupEntries = nil
})
if err != nil {
return err
}

if len(cleanupEntries) == 0 {
return nil
}

log.Infof("Processing %d deferred channel cleanups",
len(cleanupEntries))

// Process each cleanup entry.
for _, info := range cleanupEntries {
err := c.cleanupChannel(info)
if err != nil {
log.Warnf("Failed to cleanup channel %v: %v",
info.ChanPoint, err)
continue
}

log.Debugf("Cleaned up deferred channel data for %v",
info.ChanPoint)
}

return nil
}

// cleanupChannel performs the actual cleanup for a single channel.
func (c *ChannelStateDB) cleanupChannel(info *PendingCleanupInfo) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
// Get the open channel bucket structure.
openChanBucket := tx.ReadWriteBucket(openChannelBucket)
if openChanBucket == nil {
// If there's no open channel bucket, nothing to clean.
return c.removePendingCleanup(tx, &info.ChanPoint)
}

nodeChanBucket := openChanBucket.NestedReadWriteBucket(
info.NodePub[:],
)
if nodeChanBucket == nil {
return c.removePendingCleanup(tx, &info.ChanPoint)
}

chainBucket := nodeChanBucket.NestedReadWriteBucket(
info.ChainHash[:],
)
if chainBucket == nil {
return c.removePendingCleanup(tx, &info.ChanPoint)
}

var chanPointBuf bytes.Buffer
err := graphdb.WriteOutpoint(&chanPointBuf, &info.ChanPoint)
if err != nil {
return err
}
chanKey := chanPointBuf.Bytes()

chanBucket := chainBucket.NestedReadWriteBucket(chanKey)
if chanBucket == nil {
// Channel bucket doesn't exist, just remove the
// pending cleanup entry.
return c.removePendingCleanup(tx, &info.ChanPoint)
}

// Fetch the channel state to get the packager.
chanState, err := fetchOpenChannel(
chanBucket, &info.ChanPoint,
)
if err != nil {
return err
}

// Delete all the forwarding packages stored for this channel.
if err := chanState.Packager.Wipe(tx); err != nil {
return err
}

// Purge the remaining channel metadata from the database.
if err := deleteOpenChannel(chanBucket); err != nil {
return err
}

// Remove the channel from the frozen channel bucket if needed.
if chanState.ChanType.IsFrozen() ||
chanState.ChanType.HasLeaseExpiration() {

if err := deleteThawHeight(chanBucket); err != nil {
return err
}
}

// Delete the information stored within the revocation log.
if err := deleteLogBucket(chanBucket); err != nil {
return err
}

// Delete the channel bucket itself.
if err := chainBucket.DeleteNestedBucket(chanKey); err != nil {
return err
}

// Finally, remove the pending cleanup entry.
return c.removePendingCleanup(tx, &info.ChanPoint)
}, func() {})
}

// removePendingCleanup removes a channel's entry from the pending cleanup
// bucket.
func (c *ChannelStateDB) removePendingCleanup(tx kvdb.RwTx,
chanPoint *wire.OutPoint) error {

cleanupBucket := tx.ReadWriteBucket(pendingCleanupBucket)
if cleanupBucket == nil {
return nil
}

var chanPointBuf bytes.Buffer
if err := graphdb.WriteOutpoint(&chanPointBuf, chanPoint); err != nil {
return err
}

return cleanupBucket.Delete(chanPointBuf.Bytes())
}

// MakeTestInvoiceDB is used to create a test invoice database for testing
// purposes. It simply calls into MakeTestDB so the same modifiers can be used.
func MakeTestInvoiceDB(t *testing.T, modifiers ...OptionModifier) (
Expand Down
Loading
Loading