Node flags (cmd/node/main.go):
--max-cpu / --max-ram-mb — Go runtime caps (GOMAXPROCS / GOMEMLIMIT)
--feed-disk-limit-mb — hard 507 refusal for new post bodies over quota
--chain-disk-limit-mb — advisory watcher (can't reject blocks without
breaking consensus; logs WARN every minute)
Client — Saved Messages (self-chat):
- Auto-created on sign-in, pinned top of chat list, blue bookmark avatar
- Send short-circuits the relay (no encrypt, no fee, no mailbox hop)
- Empty state rendered outside inverted FlatList — fixes the mirrored
"say hi…" on Android RTL-aware layout builds
- PostCard shows "You" for own posts instead of the self-contact alias
Client — user walls:
- New route /(app)/feed/author/[pub] with infinite-scroll via
`created_at` cursor and pull-to-refresh
- Profile screen gains "View posts" button (universal) next to
"Open chat" (contact-only)
Feed pipeline:
- Bump client JPEG quality 0.5 → 0.75 to match server scrubber (Q=75),
so a 60 KiB compose doesn't balloon past 256 KiB after server re-encode
- ErrPostTooLarge now wraps with the actual size vs cap, errors.Is
preserved in the HTTP layer
- FeedMailbox quota + DiskUsage surface — supports new CLI flag
README:
- Step-by-step "first node / joiner" section on the landing page,
full flag tables incl. the new resource-cap group, minimal
checklists for open/private/low-end deployments
469 lines
15 KiB
Go
469 lines
15 KiB
Go
package relay
|
||
|
||
// FeedMailbox — BadgerDB-backed storage for social-feed post bodies.
|
||
//
|
||
// Posts are PUBLIC (plaintext) — unlike the E2E inbox envelopes, feed posts
|
||
// have no recipient key. They live keyed by post ID and can be read by
|
||
// anyone via GET /feed/post/{id}.
|
||
//
|
||
// Storage layout (keys):
|
||
//
|
||
// post:<postID> → FeedPost JSON (body + metadata)
|
||
// post-by-author:<author>:<ts> → postID (chrono index for GET /feed/author)
|
||
// post-views:<postID> → uint64 big-endian (view counter)
|
||
// post-hashtag:<tag>:<ts>:<id> → postID (inverted index for #tag search)
|
||
// post-trending:<score>:<id> → postID (ranked index; score = likes × 2 + views)
|
||
//
|
||
// View counts are off-chain because on-chain would mean one tx per view —
|
||
// financially and architecturally unreasonable. Likes stay on-chain
|
||
// (provable authorship + anti-Sybil via fee).
|
||
//
|
||
// Anti-spam:
|
||
// - MaxPostBodySize is enforced at Store time.
|
||
// - Per-sender rate limiting happens at the HTTP layer (withSubmitTxGuards).
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"regexp"
|
||
"sort"
|
||
"strings"
|
||
"time"
|
||
|
||
badger "github.com/dgraph-io/badger/v4"
|
||
)
|
||
|
||
const (
|
||
feedPostPrefix = "feedpost:"
|
||
feedAuthorIdxPrefix = "feedauthor:" // feedauthor:<author>:<ts_20d>:<postID>
|
||
feedViewPrefix = "feedview:" // feedview:<postID> → uint64
|
||
feedHashtagPrefix = "feedtag:" // feedtag:<tag>:<ts_20d>:<postID>
|
||
feedTrendingPrefix = "feedtrend:" // feedtrend:<score_20d_inv>:<postID>
|
||
|
||
// MaxPostBodySize is the hard cap on a post's on-wire size. Matches
|
||
// blockchain.MaxPostSize so the on-chain fee estimate is always
|
||
// enforceable (no "I claimed 10 KiB but sent 50 KiB" trick).
|
||
MaxPostBodySize = 256 * 1024 // 256 KiB
|
||
|
||
// FeedPostDefaultTTLDays is how long a post body lives before BadgerDB
|
||
// auto-evicts it. On-chain metadata stays forever, so a reader hitting
|
||
// a stale post sees the record with a "body unavailable" indicator.
|
||
// Configurable via the env var DCHAIN_FEED_TTL_DAYS (handled in main.go).
|
||
FeedPostDefaultTTLDays = 30
|
||
|
||
// maxHashtagsPerPost caps how many distinct hashtags we'll index per
|
||
// post. Prevents a spammer from polluting every tag namespace with one
|
||
// mega-post.
|
||
maxHashtagsPerPost = 8
|
||
|
||
// trendingHalfLifeSeconds controls how quickly a post's score decays.
|
||
// Used when computing "trending": recent engagement weighs more than old.
|
||
trendingHalfLifeSeconds = 12 * 3600 // 12 hours
|
||
)
|
||
|
||
// FeedPost is the off-chain body. On-chain we keep the metadata in
|
||
// blockchain.PostRecord — here we store the readable payload.
|
||
//
|
||
// Why not just put the body on-chain? Size — a 256 KiB post × thousands
|
||
// per day would bloat the block history. Keeping it in a relay DB with a
|
||
// TTL gives us ephemerality while still letting on-chain records serve as
|
||
// the permanent proof of authorship.
|
||
type FeedPost struct {
|
||
// Identity (matches on-chain PostRecord.PostID).
|
||
PostID string `json:"post_id"`
|
||
Author string `json:"author"` // Ed25519 hex
|
||
|
||
// Payload. Content is always plaintext (posts are public). Attachment is
|
||
// a pre-compressed blob — client is expected to have minimised size
|
||
// before publish. If empty, the post is text-only.
|
||
Content string `json:"content"`
|
||
ContentType string `json:"content_type,omitempty"` // "text/plain" | "text/markdown" | ...
|
||
Attachment []byte `json:"attachment,omitempty"`
|
||
AttachmentMIME string `json:"attachment_mime,omitempty"`
|
||
Hashtags []string `json:"hashtags,omitempty"` // lowercased, without leading #
|
||
|
||
// CreatedAt matches the on-chain tx timestamp — we stamp it server-side
|
||
// at Store() so senders can't back-date.
|
||
CreatedAt int64 `json:"created_at"`
|
||
|
||
// ReplyTo / QuoteOf mirror the on-chain PostRecord fields, included
|
||
// here so the client can thread without a second RPC.
|
||
ReplyTo string `json:"reply_to,omitempty"`
|
||
QuoteOf string `json:"quote_of,omitempty"`
|
||
}
|
||
|
||
// ErrPostTooLarge is returned by Store when the post body exceeds MaxPostBodySize.
|
||
var ErrPostTooLarge = errors.New("post body exceeds maximum allowed size")
|
||
|
||
// ErrFeedQuotaExceeded is returned by Store when the on-disk footprint
|
||
// (LSM + value log) plus the incoming post would exceed the operator-set
|
||
// disk quota. Ops set this via --feed-disk-limit-mb. Zero = unlimited.
|
||
var ErrFeedQuotaExceeded = errors.New("feed mailbox disk quota exceeded")
|
||
|
||
// FeedMailbox stores feed post bodies.
|
||
type FeedMailbox struct {
|
||
db *badger.DB
|
||
ttl time.Duration
|
||
quotaBytes int64 // 0 = unlimited
|
||
}
|
||
|
||
// NewFeedMailbox wraps an already-open Badger DB. TTL controls how long
|
||
// post bodies live before auto-eviction (on-chain metadata persists
|
||
// forever independently). quotaBytes caps the on-disk footprint; 0 or
|
||
// negative means unlimited.
|
||
func NewFeedMailbox(db *badger.DB, ttl time.Duration, quotaBytes int64) *FeedMailbox {
|
||
if ttl <= 0 {
|
||
ttl = time.Duration(FeedPostDefaultTTLDays) * 24 * time.Hour
|
||
}
|
||
if quotaBytes < 0 {
|
||
quotaBytes = 0
|
||
}
|
||
return &FeedMailbox{db: db, ttl: ttl, quotaBytes: quotaBytes}
|
||
}
|
||
|
||
// OpenFeedMailbox opens (or creates) a dedicated BadgerDB at dbPath.
|
||
// quotaBytes caps the total on-disk footprint (LSM + vlog); 0 = unlimited.
|
||
func OpenFeedMailbox(dbPath string, ttl time.Duration, quotaBytes int64) (*FeedMailbox, error) {
|
||
opts := badger.DefaultOptions(dbPath).
|
||
WithLogger(nil).
|
||
WithValueLogFileSize(128 << 20).
|
||
WithNumVersionsToKeep(1).
|
||
WithCompactL0OnClose(true)
|
||
db, err := badger.Open(opts)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("open feed mailbox db: %w", err)
|
||
}
|
||
return NewFeedMailbox(db, ttl, quotaBytes), nil
|
||
}
|
||
|
||
// DiskUsage returns the current on-disk footprint (LSM + value log) in
|
||
// bytes. Cheap — Badger tracks these counters internally.
|
||
func (fm *FeedMailbox) DiskUsage() int64 {
|
||
lsm, vlog := fm.db.Size()
|
||
return lsm + vlog
|
||
}
|
||
|
||
// Quota returns the configured disk quota in bytes. 0 = unlimited.
|
||
func (fm *FeedMailbox) Quota() int64 { return fm.quotaBytes }
|
||
|
||
// Close releases the underlying Badger handle.
|
||
func (fm *FeedMailbox) Close() error { return fm.db.Close() }
|
||
|
||
// Store persists a post body and updates all indices. `createdAt` is the
|
||
// canonical timestamp (usually from the chain tx) and becomes the
|
||
// server's view of when the post happened — clients' wall-clock values
|
||
// are ignored.
|
||
//
|
||
// Returns the set of hashtags actually indexed (after dedup + cap).
|
||
func (fm *FeedMailbox) Store(post *FeedPost, createdAt int64) ([]string, error) {
|
||
size := estimatePostSize(post)
|
||
if size > MaxPostBodySize {
|
||
// Wrap the sentinel so the HTTP layer can still errors.Is() on it
|
||
// while the operator / client sees the actual offending numbers.
|
||
// This catches the common case where the client's pre-scrub
|
||
// estimate is below the cap but the server re-encode (quality=75
|
||
// JPEG) inflates past it.
|
||
return nil, fmt.Errorf("%w: size %d > max %d (after server scrub)",
|
||
ErrPostTooLarge, size, MaxPostBodySize)
|
||
}
|
||
// Disk quota: refuse new bodies once we're already over the cap.
|
||
// `size` is a post-body estimate, not the exact BadgerDB write-amp
|
||
// cost; we accept that slack — the goal is a coarse guard-rail so
|
||
// an operator's disk doesn't blow up unnoticed. Exceeding nodes
|
||
// still serve existing posts; only new Store() calls are refused.
|
||
if fm.quotaBytes > 0 {
|
||
if fm.DiskUsage()+int64(size) > fm.quotaBytes {
|
||
return nil, ErrFeedQuotaExceeded
|
||
}
|
||
}
|
||
|
||
post.CreatedAt = createdAt
|
||
// Normalise hashtags — the client may or may not have supplied them;
|
||
// we derive from Content as the authoritative source, then dedup.
|
||
tags := extractHashtags(post.Content)
|
||
if len(tags) > maxHashtagsPerPost {
|
||
tags = tags[:maxHashtagsPerPost]
|
||
}
|
||
post.Hashtags = tags
|
||
|
||
val, err := json.Marshal(post)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("marshal post: %w", err)
|
||
}
|
||
|
||
err = fm.db.Update(func(txn *badger.Txn) error {
|
||
// Idempotent on postID — second Store is a no-op.
|
||
key := []byte(feedPostPrefix + post.PostID)
|
||
if _, err := txn.Get(key); err == nil {
|
||
return nil
|
||
}
|
||
entry := badger.NewEntry(key, val).WithTTL(fm.ttl)
|
||
if err := txn.SetEntry(entry); err != nil {
|
||
return err
|
||
}
|
||
|
||
// Author chrono index.
|
||
authorKey := fmt.Sprintf("%s%s:%020d:%s", feedAuthorIdxPrefix, post.Author, createdAt, post.PostID)
|
||
if err := txn.SetEntry(
|
||
badger.NewEntry([]byte(authorKey), []byte(post.PostID)).WithTTL(fm.ttl),
|
||
); err != nil {
|
||
return err
|
||
}
|
||
|
||
// Hashtag inverted index.
|
||
for _, tag := range tags {
|
||
tagKey := fmt.Sprintf("%s%s:%020d:%s", feedHashtagPrefix, tag, createdAt, post.PostID)
|
||
if err := txn.SetEntry(
|
||
badger.NewEntry([]byte(tagKey), []byte(post.PostID)).WithTTL(fm.ttl),
|
||
); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return tags, nil
|
||
}
|
||
|
||
// Get returns the full post body, or nil if not found / evicted.
|
||
func (fm *FeedMailbox) Get(postID string) (*FeedPost, error) {
|
||
var p FeedPost
|
||
err := fm.db.View(func(txn *badger.Txn) error {
|
||
item, err := txn.Get([]byte(feedPostPrefix + postID))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return item.Value(func(val []byte) error {
|
||
return json.Unmarshal(val, &p)
|
||
})
|
||
})
|
||
if errors.Is(err, badger.ErrKeyNotFound) {
|
||
return nil, nil
|
||
}
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return &p, nil
|
||
}
|
||
|
||
// Delete removes a post body and its indices. On-chain soft-delete stays;
|
||
// this just frees storage. Called by DELETE_POST event handler hook.
|
||
func (fm *FeedMailbox) Delete(postID string) error {
|
||
// We need author and createdAt to build index keys — fetch first.
|
||
post, err := fm.Get(postID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if post == nil {
|
||
return nil
|
||
}
|
||
return fm.db.Update(func(txn *badger.Txn) error {
|
||
if err := txn.Delete([]byte(feedPostPrefix + postID)); err != nil {
|
||
return err
|
||
}
|
||
authorKey := fmt.Sprintf("%s%s:%020d:%s",
|
||
feedAuthorIdxPrefix, post.Author, post.CreatedAt, postID)
|
||
if err := txn.Delete([]byte(authorKey)); err != nil {
|
||
return err
|
||
}
|
||
for _, tag := range post.Hashtags {
|
||
tagKey := fmt.Sprintf("%s%s:%020d:%s",
|
||
feedHashtagPrefix, tag, post.CreatedAt, postID)
|
||
if err := txn.Delete([]byte(tagKey)); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
}
|
||
|
||
// IncrementView bumps the view counter for a post. No-op on missing post.
|
||
// Returns the new count. Views are ephemeral (tied to the post TTL) —
|
||
// a fresh relay that gossip-loads an old post starts from 0, which is
|
||
// acceptable for a non-authoritative metric.
|
||
func (fm *FeedMailbox) IncrementView(postID string) (uint64, error) {
|
||
var next uint64
|
||
err := fm.db.Update(func(txn *badger.Txn) error {
|
||
key := []byte(feedViewPrefix + postID)
|
||
var cur uint64
|
||
if item, err := txn.Get(key); err == nil {
|
||
_ = item.Value(func(val []byte) error {
|
||
if len(val) == 8 {
|
||
cur = binary.BigEndian.Uint64(val)
|
||
}
|
||
return nil
|
||
})
|
||
} else if !errors.Is(err, badger.ErrKeyNotFound) {
|
||
return err
|
||
}
|
||
next = cur + 1
|
||
var buf [8]byte
|
||
binary.BigEndian.PutUint64(buf[:], next)
|
||
return txn.SetEntry(badger.NewEntry(key, buf[:]).WithTTL(fm.ttl))
|
||
})
|
||
return next, err
|
||
}
|
||
|
||
// ViewCount returns the current (off-chain) view count for a post.
|
||
func (fm *FeedMailbox) ViewCount(postID string) (uint64, error) {
|
||
var n uint64
|
||
err := fm.db.View(func(txn *badger.Txn) error {
|
||
item, err := txn.Get([]byte(feedViewPrefix + postID))
|
||
if errors.Is(err, badger.ErrKeyNotFound) {
|
||
return nil
|
||
}
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return item.Value(func(val []byte) error {
|
||
if len(val) == 8 {
|
||
n = binary.BigEndian.Uint64(val)
|
||
}
|
||
return nil
|
||
})
|
||
})
|
||
return n, err
|
||
}
|
||
|
||
// PostsByAuthor lists the N most recent post IDs by an author, newest first.
|
||
// Pure ID listing — callers fetch bodies via Get.
|
||
func (fm *FeedMailbox) PostsByAuthor(authorPub string, limit int) ([]string, error) {
|
||
if limit <= 0 || limit > 200 {
|
||
limit = 50
|
||
}
|
||
prefix := []byte(feedAuthorIdxPrefix + authorPub + ":")
|
||
return fm.reverseIDScan(prefix, limit)
|
||
}
|
||
|
||
// PostsByHashtag lists the N most recent posts tagged with tag (lowercased).
|
||
func (fm *FeedMailbox) PostsByHashtag(tag string, limit int) ([]string, error) {
|
||
tag = strings.ToLower(strings.TrimPrefix(tag, "#"))
|
||
if tag == "" {
|
||
return nil, nil
|
||
}
|
||
if limit <= 0 || limit > 200 {
|
||
limit = 50
|
||
}
|
||
prefix := []byte(feedHashtagPrefix + tag + ":")
|
||
return fm.reverseIDScan(prefix, limit)
|
||
}
|
||
|
||
// reverseIDScan walks prefix in reverse lex order and returns the value
|
||
// (postID) of each entry up to limit. Used for newest-first indices.
|
||
func (fm *FeedMailbox) reverseIDScan(prefix []byte, limit int) ([]string, error) {
|
||
out := make([]string, 0, limit)
|
||
err := fm.db.View(func(txn *badger.Txn) error {
|
||
opts := badger.DefaultIteratorOptions
|
||
opts.Prefix = prefix
|
||
opts.Reverse = true
|
||
seek := append([]byte{}, prefix...)
|
||
seek = append(seek, 0xff)
|
||
it := txn.NewIterator(opts)
|
||
defer it.Close()
|
||
for it.Seek(seek); it.ValidForPrefix(prefix) && len(out) < limit; it.Next() {
|
||
item := it.Item()
|
||
_ = item.Value(func(val []byte) error {
|
||
out = append(out, string(val))
|
||
return nil
|
||
})
|
||
}
|
||
return nil
|
||
})
|
||
return out, err
|
||
}
|
||
|
||
// RecentPostIDs enumerates the most recent posts stored by this relay
|
||
// across ALL authors. Used by the trending / recommendations endpoints to
|
||
// seed the candidate pool. maxAgeSeconds bounds the walk (0 = no bound).
|
||
func (fm *FeedMailbox) RecentPostIDs(maxAgeSeconds int64, limit int) ([]string, error) {
|
||
if limit <= 0 || limit > 500 {
|
||
limit = 100
|
||
}
|
||
// Can't reuse chrono indices because they're per-author. We scan post:*
|
||
// and collect, sorted by CreatedAt from the decoded body. This is O(M)
|
||
// where M = #posts in DB — fine for MVP since TTL-bounded M is small
|
||
// (~5k posts × 30d TTL on a busy node).
|
||
type candidate struct {
|
||
id string
|
||
ts int64
|
||
}
|
||
cutoff := int64(0)
|
||
if maxAgeSeconds > 0 {
|
||
cutoff = time.Now().Unix() - maxAgeSeconds
|
||
}
|
||
var candidates []candidate
|
||
prefix := []byte(feedPostPrefix)
|
||
err := fm.db.View(func(txn *badger.Txn) error {
|
||
opts := badger.DefaultIteratorOptions
|
||
opts.Prefix = prefix
|
||
it := txn.NewIterator(opts)
|
||
defer it.Close()
|
||
for it.Rewind(); it.Valid(); it.Next() {
|
||
item := it.Item()
|
||
err := item.Value(func(val []byte) error {
|
||
var p FeedPost
|
||
if err := json.Unmarshal(val, &p); err != nil {
|
||
return nil // skip corrupt
|
||
}
|
||
if p.CreatedAt < cutoff {
|
||
return nil
|
||
}
|
||
candidates = append(candidates, candidate{id: p.PostID, ts: p.CreatedAt})
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
sort.Slice(candidates, func(i, j int) bool { return candidates[i].ts > candidates[j].ts })
|
||
if len(candidates) > limit {
|
||
candidates = candidates[:limit]
|
||
}
|
||
out := make([]string, len(candidates))
|
||
for i, c := range candidates {
|
||
out[i] = c.id
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// extractHashtags finds #word tokens in text, lowercases, dedups, preserves
|
||
// first-seen order. Word = [A-Za-z0-9_] chars, length 1..40.
|
||
func extractHashtags(text string) []string {
|
||
re := hashtagRegex
|
||
matches := re.FindAllString(text, -1)
|
||
seen := make(map[string]struct{}, len(matches))
|
||
out := make([]string, 0, len(matches))
|
||
for _, m := range matches {
|
||
tag := strings.ToLower(strings.TrimPrefix(m, "#"))
|
||
if len(tag) == 0 || len(tag) > 40 {
|
||
continue
|
||
}
|
||
if _, ok := seen[tag]; ok {
|
||
continue
|
||
}
|
||
seen[tag] = struct{}{}
|
||
out = append(out, tag)
|
||
}
|
||
return out
|
||
}
|
||
|
||
var hashtagRegex = regexp.MustCompile(`#[A-Za-z0-9_\p{L}]{1,40}`)
|
||
|
||
// estimatePostSize returns the on-disk size used for fee calculation.
|
||
// Matches the client's pre-publish size estimate so fees are predictable.
|
||
func estimatePostSize(post *FeedPost) uint64 {
|
||
n := uint64(len(post.Content)) + uint64(len(post.Attachment))
|
||
// Small overhead for metadata (~120 bytes of JSON scaffolding).
|
||
n += 128
|
||
return n
|
||
}
|