Files
dchain/relay/feed_mailbox.go
vsecoder a75cbcd224 feat: resource caps, Saved Messages, author walls, docs for node bring-up
Node flags (cmd/node/main.go):
  --max-cpu / --max-ram-mb — Go runtime caps (GOMAXPROCS / GOMEMLIMIT)
  --feed-disk-limit-mb — hard 507 refusal for new post bodies over quota
  --chain-disk-limit-mb — advisory watcher (can't reject blocks without
  breaking consensus; logs WARN every minute)

Client — Saved Messages (self-chat):
  - Auto-created on sign-in, pinned top of chat list, blue bookmark avatar
  - Send short-circuits the relay (no encrypt, no fee, no mailbox hop)
  - Empty state rendered outside inverted FlatList — fixes the mirrored
    "say hi…" on Android RTL-aware layout builds
  - PostCard shows "You" for own posts instead of the self-contact alias

Client — user walls:
  - New route /(app)/feed/author/[pub] with infinite-scroll via
    `created_at` cursor and pull-to-refresh
  - Profile screen gains "View posts" button (universal) next to
    "Open chat" (contact-only)

Feed pipeline:
  - Bump client JPEG quality 0.5 → 0.75 to match server scrubber (Q=75),
    so a 60 KiB compose doesn't balloon past 256 KiB after server re-encode
  - ErrPostTooLarge now wraps with the actual size vs cap, errors.Is
    preserved in the HTTP layer
  - FeedMailbox quota + DiskUsage surface — supports new CLI flag

README:
  - Step-by-step "first node / joiner" section on the landing page,
    full flag tables incl. the new resource-cap group, minimal
    checklists for open/private/low-end deployments
2026-04-19 13:14:47 +03:00

469 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package relay
// FeedMailbox — BadgerDB-backed storage for social-feed post bodies.
//
// Posts are PUBLIC (plaintext) — unlike the E2E inbox envelopes, feed posts
// have no recipient key. They live keyed by post ID and can be read by
// anyone via GET /feed/post/{id}.
//
// Storage layout (keys):
//
// post:<postID> → FeedPost JSON (body + metadata)
// post-by-author:<author>:<ts> → postID (chrono index for GET /feed/author)
// post-views:<postID> → uint64 big-endian (view counter)
// post-hashtag:<tag>:<ts>:<id> → postID (inverted index for #tag search)
// post-trending:<score>:<id> → postID (ranked index; score = likes × 2 + views)
//
// View counts are off-chain because on-chain would mean one tx per view —
// financially and architecturally unreasonable. Likes stay on-chain
// (provable authorship + anti-Sybil via fee).
//
// Anti-spam:
// - MaxPostBodySize is enforced at Store time.
// - Per-sender rate limiting happens at the HTTP layer (withSubmitTxGuards).
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"regexp"
"sort"
"strings"
"time"
badger "github.com/dgraph-io/badger/v4"
)
const (
feedPostPrefix = "feedpost:"
feedAuthorIdxPrefix = "feedauthor:" // feedauthor:<author>:<ts_20d>:<postID>
feedViewPrefix = "feedview:" // feedview:<postID> → uint64
feedHashtagPrefix = "feedtag:" // feedtag:<tag>:<ts_20d>:<postID>
feedTrendingPrefix = "feedtrend:" // feedtrend:<score_20d_inv>:<postID>
// MaxPostBodySize is the hard cap on a post's on-wire size. Matches
// blockchain.MaxPostSize so the on-chain fee estimate is always
// enforceable (no "I claimed 10 KiB but sent 50 KiB" trick).
MaxPostBodySize = 256 * 1024 // 256 KiB
// FeedPostDefaultTTLDays is how long a post body lives before BadgerDB
// auto-evicts it. On-chain metadata stays forever, so a reader hitting
// a stale post sees the record with a "body unavailable" indicator.
// Configurable via the env var DCHAIN_FEED_TTL_DAYS (handled in main.go).
FeedPostDefaultTTLDays = 30
// maxHashtagsPerPost caps how many distinct hashtags we'll index per
// post. Prevents a spammer from polluting every tag namespace with one
// mega-post.
maxHashtagsPerPost = 8
// trendingHalfLifeSeconds controls how quickly a post's score decays.
// Used when computing "trending": recent engagement weighs more than old.
trendingHalfLifeSeconds = 12 * 3600 // 12 hours
)
// FeedPost is the off-chain body. On-chain we keep the metadata in
// blockchain.PostRecord — here we store the readable payload.
//
// Why not just put the body on-chain? Size — a 256 KiB post × thousands
// per day would bloat the block history. Keeping it in a relay DB with a
// TTL gives us ephemerality while still letting on-chain records serve as
// the permanent proof of authorship.
type FeedPost struct {
// Identity (matches on-chain PostRecord.PostID).
PostID string `json:"post_id"`
Author string `json:"author"` // Ed25519 hex
// Payload. Content is always plaintext (posts are public). Attachment is
// a pre-compressed blob — client is expected to have minimised size
// before publish. If empty, the post is text-only.
Content string `json:"content"`
ContentType string `json:"content_type,omitempty"` // "text/plain" | "text/markdown" | ...
Attachment []byte `json:"attachment,omitempty"`
AttachmentMIME string `json:"attachment_mime,omitempty"`
Hashtags []string `json:"hashtags,omitempty"` // lowercased, without leading #
// CreatedAt matches the on-chain tx timestamp — we stamp it server-side
// at Store() so senders can't back-date.
CreatedAt int64 `json:"created_at"`
// ReplyTo / QuoteOf mirror the on-chain PostRecord fields, included
// here so the client can thread without a second RPC.
ReplyTo string `json:"reply_to,omitempty"`
QuoteOf string `json:"quote_of,omitempty"`
}
// ErrPostTooLarge is returned by Store when the post body exceeds MaxPostBodySize.
var ErrPostTooLarge = errors.New("post body exceeds maximum allowed size")
// ErrFeedQuotaExceeded is returned by Store when the on-disk footprint
// (LSM + value log) plus the incoming post would exceed the operator-set
// disk quota. Ops set this via --feed-disk-limit-mb. Zero = unlimited.
var ErrFeedQuotaExceeded = errors.New("feed mailbox disk quota exceeded")
// FeedMailbox stores feed post bodies.
type FeedMailbox struct {
db *badger.DB
ttl time.Duration
quotaBytes int64 // 0 = unlimited
}
// NewFeedMailbox wraps an already-open Badger DB. TTL controls how long
// post bodies live before auto-eviction (on-chain metadata persists
// forever independently). quotaBytes caps the on-disk footprint; 0 or
// negative means unlimited.
func NewFeedMailbox(db *badger.DB, ttl time.Duration, quotaBytes int64) *FeedMailbox {
if ttl <= 0 {
ttl = time.Duration(FeedPostDefaultTTLDays) * 24 * time.Hour
}
if quotaBytes < 0 {
quotaBytes = 0
}
return &FeedMailbox{db: db, ttl: ttl, quotaBytes: quotaBytes}
}
// OpenFeedMailbox opens (or creates) a dedicated BadgerDB at dbPath.
// quotaBytes caps the total on-disk footprint (LSM + vlog); 0 = unlimited.
func OpenFeedMailbox(dbPath string, ttl time.Duration, quotaBytes int64) (*FeedMailbox, error) {
opts := badger.DefaultOptions(dbPath).
WithLogger(nil).
WithValueLogFileSize(128 << 20).
WithNumVersionsToKeep(1).
WithCompactL0OnClose(true)
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("open feed mailbox db: %w", err)
}
return NewFeedMailbox(db, ttl, quotaBytes), nil
}
// DiskUsage returns the current on-disk footprint (LSM + value log) in
// bytes. Cheap — Badger tracks these counters internally.
func (fm *FeedMailbox) DiskUsage() int64 {
lsm, vlog := fm.db.Size()
return lsm + vlog
}
// Quota returns the configured disk quota in bytes. 0 = unlimited.
func (fm *FeedMailbox) Quota() int64 { return fm.quotaBytes }
// Close releases the underlying Badger handle.
func (fm *FeedMailbox) Close() error { return fm.db.Close() }
// Store persists a post body and updates all indices. `createdAt` is the
// canonical timestamp (usually from the chain tx) and becomes the
// server's view of when the post happened — clients' wall-clock values
// are ignored.
//
// Returns the set of hashtags actually indexed (after dedup + cap).
func (fm *FeedMailbox) Store(post *FeedPost, createdAt int64) ([]string, error) {
size := estimatePostSize(post)
if size > MaxPostBodySize {
// Wrap the sentinel so the HTTP layer can still errors.Is() on it
// while the operator / client sees the actual offending numbers.
// This catches the common case where the client's pre-scrub
// estimate is below the cap but the server re-encode (quality=75
// JPEG) inflates past it.
return nil, fmt.Errorf("%w: size %d > max %d (after server scrub)",
ErrPostTooLarge, size, MaxPostBodySize)
}
// Disk quota: refuse new bodies once we're already over the cap.
// `size` is a post-body estimate, not the exact BadgerDB write-amp
// cost; we accept that slack — the goal is a coarse guard-rail so
// an operator's disk doesn't blow up unnoticed. Exceeding nodes
// still serve existing posts; only new Store() calls are refused.
if fm.quotaBytes > 0 {
if fm.DiskUsage()+int64(size) > fm.quotaBytes {
return nil, ErrFeedQuotaExceeded
}
}
post.CreatedAt = createdAt
// Normalise hashtags — the client may or may not have supplied them;
// we derive from Content as the authoritative source, then dedup.
tags := extractHashtags(post.Content)
if len(tags) > maxHashtagsPerPost {
tags = tags[:maxHashtagsPerPost]
}
post.Hashtags = tags
val, err := json.Marshal(post)
if err != nil {
return nil, fmt.Errorf("marshal post: %w", err)
}
err = fm.db.Update(func(txn *badger.Txn) error {
// Idempotent on postID — second Store is a no-op.
key := []byte(feedPostPrefix + post.PostID)
if _, err := txn.Get(key); err == nil {
return nil
}
entry := badger.NewEntry(key, val).WithTTL(fm.ttl)
if err := txn.SetEntry(entry); err != nil {
return err
}
// Author chrono index.
authorKey := fmt.Sprintf("%s%s:%020d:%s", feedAuthorIdxPrefix, post.Author, createdAt, post.PostID)
if err := txn.SetEntry(
badger.NewEntry([]byte(authorKey), []byte(post.PostID)).WithTTL(fm.ttl),
); err != nil {
return err
}
// Hashtag inverted index.
for _, tag := range tags {
tagKey := fmt.Sprintf("%s%s:%020d:%s", feedHashtagPrefix, tag, createdAt, post.PostID)
if err := txn.SetEntry(
badger.NewEntry([]byte(tagKey), []byte(post.PostID)).WithTTL(fm.ttl),
); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
return tags, nil
}
// Get returns the full post body, or nil if not found / evicted.
func (fm *FeedMailbox) Get(postID string) (*FeedPost, error) {
var p FeedPost
err := fm.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(feedPostPrefix + postID))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &p)
})
})
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, nil
}
if err != nil {
return nil, err
}
return &p, nil
}
// Delete removes a post body and its indices. On-chain soft-delete stays;
// this just frees storage. Called by DELETE_POST event handler hook.
func (fm *FeedMailbox) Delete(postID string) error {
// We need author and createdAt to build index keys — fetch first.
post, err := fm.Get(postID)
if err != nil {
return err
}
if post == nil {
return nil
}
return fm.db.Update(func(txn *badger.Txn) error {
if err := txn.Delete([]byte(feedPostPrefix + postID)); err != nil {
return err
}
authorKey := fmt.Sprintf("%s%s:%020d:%s",
feedAuthorIdxPrefix, post.Author, post.CreatedAt, postID)
if err := txn.Delete([]byte(authorKey)); err != nil {
return err
}
for _, tag := range post.Hashtags {
tagKey := fmt.Sprintf("%s%s:%020d:%s",
feedHashtagPrefix, tag, post.CreatedAt, postID)
if err := txn.Delete([]byte(tagKey)); err != nil {
return err
}
}
return nil
})
}
// IncrementView bumps the view counter for a post. No-op on missing post.
// Returns the new count. Views are ephemeral (tied to the post TTL) —
// a fresh relay that gossip-loads an old post starts from 0, which is
// acceptable for a non-authoritative metric.
func (fm *FeedMailbox) IncrementView(postID string) (uint64, error) {
var next uint64
err := fm.db.Update(func(txn *badger.Txn) error {
key := []byte(feedViewPrefix + postID)
var cur uint64
if item, err := txn.Get(key); err == nil {
_ = item.Value(func(val []byte) error {
if len(val) == 8 {
cur = binary.BigEndian.Uint64(val)
}
return nil
})
} else if !errors.Is(err, badger.ErrKeyNotFound) {
return err
}
next = cur + 1
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], next)
return txn.SetEntry(badger.NewEntry(key, buf[:]).WithTTL(fm.ttl))
})
return next, err
}
// ViewCount returns the current (off-chain) view count for a post.
func (fm *FeedMailbox) ViewCount(postID string) (uint64, error) {
var n uint64
err := fm.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(feedViewPrefix + postID))
if errors.Is(err, badger.ErrKeyNotFound) {
return nil
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
if len(val) == 8 {
n = binary.BigEndian.Uint64(val)
}
return nil
})
})
return n, err
}
// PostsByAuthor lists the N most recent post IDs by an author, newest first.
// Pure ID listing — callers fetch bodies via Get.
func (fm *FeedMailbox) PostsByAuthor(authorPub string, limit int) ([]string, error) {
if limit <= 0 || limit > 200 {
limit = 50
}
prefix := []byte(feedAuthorIdxPrefix + authorPub + ":")
return fm.reverseIDScan(prefix, limit)
}
// PostsByHashtag lists the N most recent posts tagged with tag (lowercased).
func (fm *FeedMailbox) PostsByHashtag(tag string, limit int) ([]string, error) {
tag = strings.ToLower(strings.TrimPrefix(tag, "#"))
if tag == "" {
return nil, nil
}
if limit <= 0 || limit > 200 {
limit = 50
}
prefix := []byte(feedHashtagPrefix + tag + ":")
return fm.reverseIDScan(prefix, limit)
}
// reverseIDScan walks prefix in reverse lex order and returns the value
// (postID) of each entry up to limit. Used for newest-first indices.
func (fm *FeedMailbox) reverseIDScan(prefix []byte, limit int) ([]string, error) {
out := make([]string, 0, limit)
err := fm.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = prefix
opts.Reverse = true
seek := append([]byte{}, prefix...)
seek = append(seek, 0xff)
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(seek); it.ValidForPrefix(prefix) && len(out) < limit; it.Next() {
item := it.Item()
_ = item.Value(func(val []byte) error {
out = append(out, string(val))
return nil
})
}
return nil
})
return out, err
}
// RecentPostIDs enumerates the most recent posts stored by this relay
// across ALL authors. Used by the trending / recommendations endpoints to
// seed the candidate pool. maxAgeSeconds bounds the walk (0 = no bound).
func (fm *FeedMailbox) RecentPostIDs(maxAgeSeconds int64, limit int) ([]string, error) {
if limit <= 0 || limit > 500 {
limit = 100
}
// Can't reuse chrono indices because they're per-author. We scan post:*
// and collect, sorted by CreatedAt from the decoded body. This is O(M)
// where M = #posts in DB — fine for MVP since TTL-bounded M is small
// (~5k posts × 30d TTL on a busy node).
type candidate struct {
id string
ts int64
}
cutoff := int64(0)
if maxAgeSeconds > 0 {
cutoff = time.Now().Unix() - maxAgeSeconds
}
var candidates []candidate
prefix := []byte(feedPostPrefix)
err := fm.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = prefix
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
err := item.Value(func(val []byte) error {
var p FeedPost
if err := json.Unmarshal(val, &p); err != nil {
return nil // skip corrupt
}
if p.CreatedAt < cutoff {
return nil
}
candidates = append(candidates, candidate{id: p.PostID, ts: p.CreatedAt})
return nil
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
sort.Slice(candidates, func(i, j int) bool { return candidates[i].ts > candidates[j].ts })
if len(candidates) > limit {
candidates = candidates[:limit]
}
out := make([]string, len(candidates))
for i, c := range candidates {
out[i] = c.id
}
return out, nil
}
// extractHashtags finds #word tokens in text, lowercases, dedups, preserves
// first-seen order. Word = [A-Za-z0-9_] chars, length 1..40.
func extractHashtags(text string) []string {
re := hashtagRegex
matches := re.FindAllString(text, -1)
seen := make(map[string]struct{}, len(matches))
out := make([]string, 0, len(matches))
for _, m := range matches {
tag := strings.ToLower(strings.TrimPrefix(m, "#"))
if len(tag) == 0 || len(tag) > 40 {
continue
}
if _, ok := seen[tag]; ok {
continue
}
seen[tag] = struct{}{}
out = append(out, tag)
}
return out
}
var hashtagRegex = regexp.MustCompile(`#[A-Za-z0-9_\p{L}]{1,40}`)
// estimatePostSize returns the on-disk size used for fee calculation.
// Matches the client's pre-publish size estimate so fees are predictable.
func estimatePostSize(post *FeedPost) uint64 {
n := uint64(len(post.Content)) + uint64(len(post.Attachment))
// Small overhead for metadata (~120 bytes of JSON scaffolding).
n += 128
return n
}