Files
dchain/relay/feed_mailbox.go
vsecoder 126658f294 feat(feed): relay body storage + HTTP endpoints (Phase B of v2.0.0)
Phase A (the previous commit) added the on-chain foundations. Phase B
is the off-chain layer: post bodies live in a BadgerDB-backed feed
mailbox, and a full HTTP surface makes the feed usable from clients.

New components

  relay/feed_mailbox.go (+ tests)
    - FeedPost: body + content-type + attachment + hashtags + thread refs
    - Store / Get / Delete with TTL-bounded eviction (30 days default)
    - View counter (IncrementView / ViewCount) — off-chain because one
      tx per view would be nonsense
    - Hashtag inverted index: auto-extracts #tokens from content on
      Store, lowercased + deduped + capped at 8/post
    - Author chrono index: PostsByAuthor returns newest-first IDs
    - RecentPostIDs: scan-by-age helper used by trending/foryou

  node/api_feed.go
    POST /feed/publish           — author-signed body upload, returns
                                   post_id + content_hash + size +
                                   hashtags + estimated fee for the
                                   follow-up on-chain CREATE_POST tx
    GET  /feed/post/{id}         — fetch body (respects on-chain soft
                                   delete, returns 410 when deleted)
    GET  /feed/post/{id}/stats   — {views, likes, liked_by_me?}
    POST /feed/post/{id}/view    — bump the counter
    GET  /feed/author/{pub}      — chain-authoritative post list
                                   enriched with body + stats
    GET  /feed/timeline          — merged feed from people the user
                                   follows (reads chain.Following,
                                   fetches each author's recent posts)
    GET  /feed/trending          — top-scored posts in last 24h
                                   (score = likes × 3 + views)
    GET  /feed/foryou            — simple recommendations: recent posts
                                   minus authors the user already
                                   follows, already-liked posts, and
                                   own posts; ranked by engagement
    GET  /feed/hashtag/{tag}     — posts tagged with the given #tag

  cmd/node/main.go wiring
    - --feed-db flag (DCHAIN_FEED_DB) + --feed-ttl-days (DCHAIN_FEED_TTL_DAYS)
    - Opens FeedMailbox + registers FeedRoutes alongside RelayRoutes
    - Threads chain.Post / LikeCount / HasLiked / PostsByAuthor / Following
      into FeedConfig so HTTP handlers can merge on-chain metadata with
      off-chain body+stats.

Auth & safety
  - POST /feed/publish: Ed25519 signature over "publish:<post_id>:
    <content_sha256_hex>:<ts>"; ±5-minute skew window for anti-replay.
  - content_hash binds body to the on-chain tx — you can't publish
    body-A off-chain and commit hash-of-body-B on-chain.
  - Writes wrapped in withSubmitTxGuards (rate-limit + size cap), reads
    in withReadLimit — same guards as /relay.

Trending / recommendations
  - V1 heuristic (likes × 3 + views) + time window. Documented as
    v2.2.0 "Feed algorithm" candidate for a proper ranking layer
    (half-life decay, follow-of-follow boost, hashtag collaborative).

Tests
  - Store round-trip, size enforcement, hashtag indexing (case-insensitive
    + dedup), view counter increments, author chrono order, delete
    cleans all indices, RecentPostIDs time-window filter.
  - Full go test ./... is green (blockchain + consensus + identity +
    relay + vm all pass).

Next (Phase C): client Feed tab — composer, timeline, post detail,
profile follow, For You + Trending screens.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 18:52:22 +03:00

432 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package relay
// FeedMailbox — BadgerDB-backed storage for social-feed post bodies.
//
// Posts are PUBLIC (plaintext) — unlike the E2E inbox envelopes, feed posts
// have no recipient key. They live keyed by post ID and can be read by
// anyone via GET /feed/post/{id}.
//
// Storage layout (keys):
//
// post:<postID> → FeedPost JSON (body + metadata)
// post-by-author:<author>:<ts> → postID (chrono index for GET /feed/author)
// post-views:<postID> → uint64 big-endian (view counter)
// post-hashtag:<tag>:<ts>:<id> → postID (inverted index for #tag search)
// post-trending:<score>:<id> → postID (ranked index; score = likes × 2 + views)
//
// View counts are off-chain because on-chain would mean one tx per view —
// financially and architecturally unreasonable. Likes stay on-chain
// (provable authorship + anti-Sybil via fee).
//
// Anti-spam:
// - MaxPostBodySize is enforced at Store time.
// - Per-sender rate limiting happens at the HTTP layer (withSubmitTxGuards).
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"regexp"
"sort"
"strings"
"time"
badger "github.com/dgraph-io/badger/v4"
)
const (
feedPostPrefix = "feedpost:"
feedAuthorIdxPrefix = "feedauthor:" // feedauthor:<author>:<ts_20d>:<postID>
feedViewPrefix = "feedview:" // feedview:<postID> → uint64
feedHashtagPrefix = "feedtag:" // feedtag:<tag>:<ts_20d>:<postID>
feedTrendingPrefix = "feedtrend:" // feedtrend:<score_20d_inv>:<postID>
// MaxPostBodySize is the hard cap on a post's on-wire size. Matches
// blockchain.MaxPostSize so the on-chain fee estimate is always
// enforceable (no "I claimed 10 KiB but sent 50 KiB" trick).
MaxPostBodySize = 256 * 1024 // 256 KiB
// FeedPostDefaultTTLDays is how long a post body lives before BadgerDB
// auto-evicts it. On-chain metadata stays forever, so a reader hitting
// a stale post sees the record with a "body unavailable" indicator.
// Configurable via the env var DCHAIN_FEED_TTL_DAYS (handled in main.go).
FeedPostDefaultTTLDays = 30
// maxHashtagsPerPost caps how many distinct hashtags we'll index per
// post. Prevents a spammer from polluting every tag namespace with one
// mega-post.
maxHashtagsPerPost = 8
// trendingHalfLifeSeconds controls how quickly a post's score decays.
// Used when computing "trending": recent engagement weighs more than old.
trendingHalfLifeSeconds = 12 * 3600 // 12 hours
)
// FeedPost is the off-chain body. On-chain we keep the metadata in
// blockchain.PostRecord — here we store the readable payload.
//
// Why not just put the body on-chain? Size — a 256 KiB post × thousands
// per day would bloat the block history. Keeping it in a relay DB with a
// TTL gives us ephemerality while still letting on-chain records serve as
// the permanent proof of authorship.
type FeedPost struct {
// Identity (matches on-chain PostRecord.PostID).
PostID string `json:"post_id"`
Author string `json:"author"` // Ed25519 hex
// Payload. Content is always plaintext (posts are public). Attachment is
// a pre-compressed blob — client is expected to have minimised size
// before publish. If empty, the post is text-only.
Content string `json:"content"`
ContentType string `json:"content_type,omitempty"` // "text/plain" | "text/markdown" | ...
Attachment []byte `json:"attachment,omitempty"`
AttachmentMIME string `json:"attachment_mime,omitempty"`
Hashtags []string `json:"hashtags,omitempty"` // lowercased, without leading #
// CreatedAt matches the on-chain tx timestamp — we stamp it server-side
// at Store() so senders can't back-date.
CreatedAt int64 `json:"created_at"`
// ReplyTo / QuoteOf mirror the on-chain PostRecord fields, included
// here so the client can thread without a second RPC.
ReplyTo string `json:"reply_to,omitempty"`
QuoteOf string `json:"quote_of,omitempty"`
}
// ErrPostTooLarge is returned by Store when the post body exceeds MaxPostBodySize.
var ErrPostTooLarge = errors.New("post body exceeds maximum allowed size")
// FeedMailbox stores feed post bodies.
type FeedMailbox struct {
db *badger.DB
ttl time.Duration
}
// NewFeedMailbox wraps an already-open Badger DB. TTL controls how long
// post bodies live before auto-eviction (on-chain metadata persists
// forever independently).
func NewFeedMailbox(db *badger.DB, ttl time.Duration) *FeedMailbox {
if ttl <= 0 {
ttl = time.Duration(FeedPostDefaultTTLDays) * 24 * time.Hour
}
return &FeedMailbox{db: db, ttl: ttl}
}
// OpenFeedMailbox opens (or creates) a dedicated BadgerDB at dbPath.
func OpenFeedMailbox(dbPath string, ttl time.Duration) (*FeedMailbox, error) {
opts := badger.DefaultOptions(dbPath).
WithLogger(nil).
WithValueLogFileSize(128 << 20).
WithNumVersionsToKeep(1).
WithCompactL0OnClose(true)
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("open feed mailbox db: %w", err)
}
return NewFeedMailbox(db, ttl), nil
}
// Close releases the underlying Badger handle.
func (fm *FeedMailbox) Close() error { return fm.db.Close() }
// Store persists a post body and updates all indices. `createdAt` is the
// canonical timestamp (usually from the chain tx) and becomes the
// server's view of when the post happened — clients' wall-clock values
// are ignored.
//
// Returns the set of hashtags actually indexed (after dedup + cap).
func (fm *FeedMailbox) Store(post *FeedPost, createdAt int64) ([]string, error) {
size := estimatePostSize(post)
if size > MaxPostBodySize {
return nil, ErrPostTooLarge
}
post.CreatedAt = createdAt
// Normalise hashtags — the client may or may not have supplied them;
// we derive from Content as the authoritative source, then dedup.
tags := extractHashtags(post.Content)
if len(tags) > maxHashtagsPerPost {
tags = tags[:maxHashtagsPerPost]
}
post.Hashtags = tags
val, err := json.Marshal(post)
if err != nil {
return nil, fmt.Errorf("marshal post: %w", err)
}
err = fm.db.Update(func(txn *badger.Txn) error {
// Idempotent on postID — second Store is a no-op.
key := []byte(feedPostPrefix + post.PostID)
if _, err := txn.Get(key); err == nil {
return nil
}
entry := badger.NewEntry(key, val).WithTTL(fm.ttl)
if err := txn.SetEntry(entry); err != nil {
return err
}
// Author chrono index.
authorKey := fmt.Sprintf("%s%s:%020d:%s", feedAuthorIdxPrefix, post.Author, createdAt, post.PostID)
if err := txn.SetEntry(
badger.NewEntry([]byte(authorKey), []byte(post.PostID)).WithTTL(fm.ttl),
); err != nil {
return err
}
// Hashtag inverted index.
for _, tag := range tags {
tagKey := fmt.Sprintf("%s%s:%020d:%s", feedHashtagPrefix, tag, createdAt, post.PostID)
if err := txn.SetEntry(
badger.NewEntry([]byte(tagKey), []byte(post.PostID)).WithTTL(fm.ttl),
); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
return tags, nil
}
// Get returns the full post body, or nil if not found / evicted.
func (fm *FeedMailbox) Get(postID string) (*FeedPost, error) {
var p FeedPost
err := fm.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(feedPostPrefix + postID))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &p)
})
})
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, nil
}
if err != nil {
return nil, err
}
return &p, nil
}
// Delete removes a post body and its indices. On-chain soft-delete stays;
// this just frees storage. Called by DELETE_POST event handler hook.
func (fm *FeedMailbox) Delete(postID string) error {
// We need author and createdAt to build index keys — fetch first.
post, err := fm.Get(postID)
if err != nil {
return err
}
if post == nil {
return nil
}
return fm.db.Update(func(txn *badger.Txn) error {
if err := txn.Delete([]byte(feedPostPrefix + postID)); err != nil {
return err
}
authorKey := fmt.Sprintf("%s%s:%020d:%s",
feedAuthorIdxPrefix, post.Author, post.CreatedAt, postID)
if err := txn.Delete([]byte(authorKey)); err != nil {
return err
}
for _, tag := range post.Hashtags {
tagKey := fmt.Sprintf("%s%s:%020d:%s",
feedHashtagPrefix, tag, post.CreatedAt, postID)
if err := txn.Delete([]byte(tagKey)); err != nil {
return err
}
}
return nil
})
}
// IncrementView bumps the view counter for a post. No-op on missing post.
// Returns the new count. Views are ephemeral (tied to the post TTL) —
// a fresh relay that gossip-loads an old post starts from 0, which is
// acceptable for a non-authoritative metric.
func (fm *FeedMailbox) IncrementView(postID string) (uint64, error) {
var next uint64
err := fm.db.Update(func(txn *badger.Txn) error {
key := []byte(feedViewPrefix + postID)
var cur uint64
if item, err := txn.Get(key); err == nil {
_ = item.Value(func(val []byte) error {
if len(val) == 8 {
cur = binary.BigEndian.Uint64(val)
}
return nil
})
} else if !errors.Is(err, badger.ErrKeyNotFound) {
return err
}
next = cur + 1
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], next)
return txn.SetEntry(badger.NewEntry(key, buf[:]).WithTTL(fm.ttl))
})
return next, err
}
// ViewCount returns the current (off-chain) view count for a post.
func (fm *FeedMailbox) ViewCount(postID string) (uint64, error) {
var n uint64
err := fm.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(feedViewPrefix + postID))
if errors.Is(err, badger.ErrKeyNotFound) {
return nil
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
if len(val) == 8 {
n = binary.BigEndian.Uint64(val)
}
return nil
})
})
return n, err
}
// PostsByAuthor lists the N most recent post IDs by an author, newest first.
// Pure ID listing — callers fetch bodies via Get.
func (fm *FeedMailbox) PostsByAuthor(authorPub string, limit int) ([]string, error) {
if limit <= 0 || limit > 200 {
limit = 50
}
prefix := []byte(feedAuthorIdxPrefix + authorPub + ":")
return fm.reverseIDScan(prefix, limit)
}
// PostsByHashtag lists the N most recent posts tagged with tag (lowercased).
func (fm *FeedMailbox) PostsByHashtag(tag string, limit int) ([]string, error) {
tag = strings.ToLower(strings.TrimPrefix(tag, "#"))
if tag == "" {
return nil, nil
}
if limit <= 0 || limit > 200 {
limit = 50
}
prefix := []byte(feedHashtagPrefix + tag + ":")
return fm.reverseIDScan(prefix, limit)
}
// reverseIDScan walks prefix in reverse lex order and returns the value
// (postID) of each entry up to limit. Used for newest-first indices.
func (fm *FeedMailbox) reverseIDScan(prefix []byte, limit int) ([]string, error) {
out := make([]string, 0, limit)
err := fm.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = prefix
opts.Reverse = true
seek := append([]byte{}, prefix...)
seek = append(seek, 0xff)
it := txn.NewIterator(opts)
defer it.Close()
for it.Seek(seek); it.ValidForPrefix(prefix) && len(out) < limit; it.Next() {
item := it.Item()
_ = item.Value(func(val []byte) error {
out = append(out, string(val))
return nil
})
}
return nil
})
return out, err
}
// RecentPostIDs enumerates the most recent posts stored by this relay
// across ALL authors. Used by the trending / recommendations endpoints to
// seed the candidate pool. maxAgeSeconds bounds the walk (0 = no bound).
func (fm *FeedMailbox) RecentPostIDs(maxAgeSeconds int64, limit int) ([]string, error) {
if limit <= 0 || limit > 500 {
limit = 100
}
// Can't reuse chrono indices because they're per-author. We scan post:*
// and collect, sorted by CreatedAt from the decoded body. This is O(M)
// where M = #posts in DB — fine for MVP since TTL-bounded M is small
// (~5k posts × 30d TTL on a busy node).
type candidate struct {
id string
ts int64
}
cutoff := int64(0)
if maxAgeSeconds > 0 {
cutoff = time.Now().Unix() - maxAgeSeconds
}
var candidates []candidate
prefix := []byte(feedPostPrefix)
err := fm.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = prefix
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
err := item.Value(func(val []byte) error {
var p FeedPost
if err := json.Unmarshal(val, &p); err != nil {
return nil // skip corrupt
}
if p.CreatedAt < cutoff {
return nil
}
candidates = append(candidates, candidate{id: p.PostID, ts: p.CreatedAt})
return nil
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
sort.Slice(candidates, func(i, j int) bool { return candidates[i].ts > candidates[j].ts })
if len(candidates) > limit {
candidates = candidates[:limit]
}
out := make([]string, len(candidates))
for i, c := range candidates {
out[i] = c.id
}
return out, nil
}
// extractHashtags finds #word tokens in text, lowercases, dedups, preserves
// first-seen order. Word = [A-Za-z0-9_] chars, length 1..40.
func extractHashtags(text string) []string {
re := hashtagRegex
matches := re.FindAllString(text, -1)
seen := make(map[string]struct{}, len(matches))
out := make([]string, 0, len(matches))
for _, m := range matches {
tag := strings.ToLower(strings.TrimPrefix(m, "#"))
if len(tag) == 0 || len(tag) > 40 {
continue
}
if _, ok := seen[tag]; ok {
continue
}
seen[tag] = struct{}{}
out = append(out, tag)
}
return out
}
var hashtagRegex = regexp.MustCompile(`#[A-Za-z0-9_\p{L}]{1,40}`)
// estimatePostSize returns the on-disk size used for fee calculation.
// Matches the client's pre-publish size estimate so fees are predictable.
func estimatePostSize(post *FeedPost) uint64 {
n := uint64(len(post.Content)) + uint64(len(post.Attachment))
// Small overhead for metadata (~120 bytes of JSON scaffolding).
n += 128
return n
}