The mailbox previously trusted the client-supplied envelope ID and SentAt,
which enabled two attacks:
- replay via re-broadcast: a malicious relay could resubmit the same
ciphertext under multiple IDs, causing the recipient to receive the
same plaintext repeatedly;
- timestamp spoofing: senders could back-date or future-date messages
to bypass the 7-day TTL or fake chronology.
Store() now recomputes env.ID as hex(sha256(nonce||ct)[:16]) and
overwrites env.SentAt with time.Now().Unix(). Both values are mutated
on the envelope pointer so downstream gossipsub publishes agree on the
normalised form.
Also documents /relay/send as non-E2E — the endpoint seals with the
relay's own key, which breaks end-to-end authenticity. Clients wanting
real E2E should POST /relay/broadcast with a pre-sealed envelope.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
284 lines
9.4 KiB
Go
284 lines
9.4 KiB
Go
package relay
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
badger "github.com/dgraph-io/badger/v4"
|
|
)
|
|
|
|
const (
|
|
// mailboxTTL is how long undelivered envelopes are retained.
|
|
mailboxTTL = 7 * 24 * time.Hour
|
|
|
|
// mailboxPrefix is the BadgerDB key prefix for stored envelopes.
|
|
// Key format: mail:<recipientX25519Hex>:<sentAt_20d>:<envelopeID>
|
|
mailboxPrefix = "mail:"
|
|
|
|
// MailboxMaxLimit caps the number of envelopes returned per single query.
|
|
MailboxMaxLimit = 200
|
|
|
|
// MailboxPerRecipientCap is the maximum number of envelopes stored per
|
|
// recipient key. When the cap is reached, the oldest envelope is evicted
|
|
// before the new one is written (sliding window, FIFO).
|
|
// At 64 KB max per envelope, one recipient occupies at most ~32 MB.
|
|
MailboxPerRecipientCap = 500
|
|
|
|
// MailboxMaxEnvelopeSize is the maximum allowed ciphertext length in bytes.
|
|
// Rejects oversized envelopes before writing to disk.
|
|
MailboxMaxEnvelopeSize = 64 * 1024 // 64 KB
|
|
)
|
|
|
|
// ErrEnvelopeTooLarge is returned by Store when the envelope exceeds the size limit.
|
|
var ErrEnvelopeTooLarge = errors.New("envelope ciphertext exceeds maximum allowed size")
|
|
|
|
// ErrMailboxFull is never returned externally (oldest entry is evicted instead),
|
|
// but kept as a sentinel for internal logic.
|
|
var errMailboxFull = errors.New("recipient mailbox is at capacity")
|
|
|
|
// Mailbox is a BadgerDB-backed store for relay envelopes awaiting pickup.
|
|
// Every received envelope is stored with a 7-day TTL regardless of whether
|
|
// the recipient is currently online. Recipients poll GET /relay/inbox to fetch
|
|
// and DELETE /relay/inbox/{id} to acknowledge delivery.
|
|
//
|
|
// Anti-spam guarantees:
|
|
// - Envelopes larger than MailboxMaxEnvelopeSize (64 KB) are rejected.
|
|
// - At most MailboxPerRecipientCap (500) envelopes per recipient are stored;
|
|
// when the cap is hit the oldest entry is silently evicted (FIFO).
|
|
// - All entries expire automatically after 7 days (BadgerDB TTL).
|
|
//
|
|
// Messages are stored encrypted — the relay cannot read their contents.
|
|
type Mailbox struct {
|
|
db *badger.DB
|
|
|
|
// onStore, if set, is invoked after every successful Store. Used by the
|
|
// node to push a WebSocket `inbox` event to subscribers of the
|
|
// recipient's x25519 pubkey so the mobile client stops polling
|
|
// /relay/inbox every 3 seconds.
|
|
//
|
|
// The callback MUST NOT block — it runs on the writer goroutine. Long
|
|
// work should be fanned out to a goroutine by the callback itself.
|
|
onStore func(*Envelope)
|
|
}
|
|
|
|
// SetOnStore registers a post-Store hook. Pass nil to clear. Safe to call
|
|
// before accepting traffic (wired once at node startup in main.go).
|
|
func (m *Mailbox) SetOnStore(cb func(*Envelope)) {
|
|
m.onStore = cb
|
|
}
|
|
|
|
// NewMailbox creates a Mailbox backed by the given BadgerDB instance.
|
|
func NewMailbox(db *badger.DB) *Mailbox {
|
|
return &Mailbox{db: db}
|
|
}
|
|
|
|
// OpenMailbox opens (or creates) a dedicated BadgerDB at dbPath for the mailbox.
|
|
//
|
|
// Storage tuning matches blockchain/chain.NewChain — 64 MiB vlog files
|
|
// (instead of 1 GiB default) so GC can actually shrink the DB, and single
|
|
// version retention since envelopes are either present or deleted.
|
|
func OpenMailbox(dbPath string) (*Mailbox, error) {
|
|
opts := badger.DefaultOptions(dbPath).
|
|
WithLogger(nil).
|
|
WithValueLogFileSize(64 << 20).
|
|
WithNumVersionsToKeep(1).
|
|
WithCompactL0OnClose(true)
|
|
db, err := badger.Open(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open mailbox db: %w", err)
|
|
}
|
|
return &Mailbox{db: db}, nil
|
|
}
|
|
|
|
// Close closes the underlying database.
|
|
func (m *Mailbox) Close() error { return m.db.Close() }
|
|
|
|
// Store persists an envelope with a 7-day TTL.
|
|
//
|
|
// Anti-spam checks (in order):
|
|
// 1. Ciphertext > MailboxMaxEnvelopeSize → returns ErrEnvelopeTooLarge.
|
|
// 2. env.ID is recomputed to the canonical value hex(sha256(nonce||ct)[:16])
|
|
// — prevents a malicious relay from storing the same ciphertext under
|
|
// multiple IDs (real content-level replay protection).
|
|
// 3. env.SentAt is overwritten with server time — senders can't back-date
|
|
// or future-date messages to bypass ordering or TTL expiry.
|
|
// 4. Duplicate envelope ID → silently no-op (idempotent).
|
|
// 5. Recipient already has MailboxPerRecipientCap entries → oldest evicted first.
|
|
//
|
|
// NOTE: Store MUTATES env.ID and env.SentAt to the canonical / server values.
|
|
// Callers that re-broadcast (gossipsub publish) after Store will see the
|
|
// normalised envelope, which is desirable — peer nodes then agree on the
|
|
// same ID and timestamp.
|
|
func (m *Mailbox) Store(env *Envelope) error {
|
|
if len(env.Ciphertext) > MailboxMaxEnvelopeSize {
|
|
return ErrEnvelopeTooLarge
|
|
}
|
|
|
|
// v1.0.1 — canonicalise id & timestamp. Any client-supplied values are
|
|
// replaced with server-computed truth. This is the simplest way to
|
|
// prevent:
|
|
// - replay-via-rebroadcast (same ciphertext under different IDs),
|
|
// - timestamp spoofing (bypass TTL / fake chronology).
|
|
env.ID = envelopeID(env.Nonce, env.Ciphertext)
|
|
env.SentAt = time.Now().Unix()
|
|
|
|
key := mailboxKey(env.RecipientPub, env.SentAt, env.ID)
|
|
val, err := json.Marshal(env)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal envelope: %w", err)
|
|
}
|
|
|
|
// Track whether this was a fresh insert (vs. duplicate) so we can skip
|
|
// firing the WS hook for idempotent resubmits — otherwise a misbehaving
|
|
// sender could amplify events by spamming the same envelope ID.
|
|
fresh := false
|
|
err = m.db.Update(func(txn *badger.Txn) error {
|
|
// Check if this exact envelope is already stored (idempotent).
|
|
if _, err := txn.Get([]byte(key)); err == nil {
|
|
return nil // already present, no-op
|
|
}
|
|
|
|
// Count existing envelopes for this recipient and collect the oldest key.
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, env.RecipientPub))
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
opts.Prefix = prefix
|
|
|
|
var count int
|
|
var oldestKey []byte
|
|
it := txn.NewIterator(opts)
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
if count == 0 {
|
|
oldestKey = it.Item().KeyCopy(nil) // first = oldest (sorted by sentAt)
|
|
}
|
|
count++
|
|
}
|
|
it.Close()
|
|
|
|
// Evict the oldest envelope if cap is reached.
|
|
if count >= MailboxPerRecipientCap && oldestKey != nil {
|
|
if err := txn.Delete(oldestKey); err != nil {
|
|
return fmt.Errorf("evict oldest envelope: %w", err)
|
|
}
|
|
}
|
|
|
|
e := badger.NewEntry([]byte(key), val).WithTTL(mailboxTTL)
|
|
if err := txn.SetEntry(e); err != nil {
|
|
return err
|
|
}
|
|
fresh = true
|
|
return nil
|
|
})
|
|
if err == nil && fresh && m.onStore != nil {
|
|
m.onStore(env)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// List returns up to limit envelopes for recipientPubHex, ordered oldest-first.
|
|
// Pass since > 0 to skip envelopes with SentAt < since (unix timestamp).
|
|
func (m *Mailbox) List(recipientPubHex string, since int64, limit int) ([]*Envelope, error) {
|
|
if limit <= 0 || limit > MailboxMaxLimit {
|
|
limit = MailboxMaxLimit
|
|
}
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex))
|
|
var out []*Envelope
|
|
|
|
err := m.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.Prefix = prefix
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
|
|
for it.Rewind(); it.Valid() && len(out) < limit; it.Next() {
|
|
if err := it.Item().Value(func(val []byte) error {
|
|
var env Envelope
|
|
if err := json.Unmarshal(val, &env); err != nil {
|
|
return nil // skip corrupt entries
|
|
}
|
|
if since > 0 && env.SentAt < since {
|
|
return nil
|
|
}
|
|
out = append(out, &env)
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return out, err
|
|
}
|
|
|
|
// Delete removes an envelope by ID.
|
|
// It scans the recipient's prefix to locate the full key (sentAt is not known by caller).
|
|
// Returns nil if the envelope is not found (already expired or never stored).
|
|
func (m *Mailbox) Delete(recipientPubHex, envelopeID string) error {
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex))
|
|
var found []byte
|
|
|
|
err := m.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
opts.Prefix = prefix
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
|
|
suffix := ":" + envelopeID
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
key := it.Item().KeyCopy(nil)
|
|
if strings.HasSuffix(string(key), suffix) {
|
|
found = key
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil || found == nil {
|
|
return err
|
|
}
|
|
|
|
return m.db.Update(func(txn *badger.Txn) error {
|
|
return txn.Delete(found)
|
|
})
|
|
}
|
|
|
|
// Count returns the number of stored envelopes for a recipient.
|
|
func (m *Mailbox) Count(recipientPubHex string) (int, error) {
|
|
prefix := []byte(fmt.Sprintf("%s%s:", mailboxPrefix, recipientPubHex))
|
|
count := 0
|
|
err := m.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
opts.Prefix = prefix
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
for it.Rewind(); it.Valid(); it.Next() {
|
|
count++
|
|
}
|
|
return nil
|
|
})
|
|
return count, err
|
|
}
|
|
|
|
// RunGC periodically runs BadgerDB value log garbage collection.
|
|
// Call in a goroutine — blocks until cancelled via channel close or process exit.
|
|
func (m *Mailbox) RunGC() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
for m.db.RunValueLogGC(0.5) == nil {
|
|
// drain until nothing left to collect
|
|
}
|
|
}
|
|
}
|
|
|
|
func mailboxKey(recipientPubHex string, sentAt int64, envelopeID string) string {
|
|
// Zero-padded sentAt keeps lexicographic order == chronological order.
|
|
// Oldest entry = first key in iterator — used for FIFO eviction.
|
|
return fmt.Sprintf("%s%s:%020d:%s", mailboxPrefix, recipientPubHex, sentAt, envelopeID)
|
|
}
|