diff --git a/cmd/node/main.go b/cmd/node/main.go index fbaeba6..99aa613 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -77,6 +77,8 @@ func main() { registerRelay := flag.Bool("register-relay", envBoolOr("DCHAIN_REGISTER_RELAY", false), "submit REGISTER_RELAY tx on startup (env: DCHAIN_REGISTER_RELAY)") relayFee := flag.Uint64("relay-fee", envUint64Or("DCHAIN_RELAY_FEE", 1_000), "relay fee per message in µT (env: DCHAIN_RELAY_FEE)") mailboxDB := flag.String("mailbox-db", envOr("DCHAIN_MAILBOX_DB", "./mailboxdata"), "BadgerDB directory for relay mailbox (env: DCHAIN_MAILBOX_DB)") + feedDB := flag.String("feed-db", envOr("DCHAIN_FEED_DB", "./feeddata"), "BadgerDB directory for social-feed post bodies (env: DCHAIN_FEED_DB)") + feedTTLDays := flag.Int("feed-ttl-days", int(envUint64Or("DCHAIN_FEED_TTL_DAYS", 30)), "how long feed posts are retained before auto-eviction (env: DCHAIN_FEED_TTL_DAYS)") govContractID := flag.String("governance-contract", envOr("DCHAIN_GOVERNANCE_CONTRACT", ""), "governance contract ID for dynamic chain parameters (env: DCHAIN_GOVERNANCE_CONTRACT)") joinSeedURL := flag.String("join", envOr("DCHAIN_JOIN", ""), "bootstrap from a running node: comma-separated HTTP URLs (env: DCHAIN_JOIN)") // Observer mode: the node participates in the P2P network, applies @@ -634,6 +636,15 @@ func main() { go mailbox.RunGC() log.Printf("[NODE] relay mailbox: %s", *mailboxDB) + // --- Feed mailbox (social-feed post bodies, v2.0.0) --- + feedTTL := time.Duration(*feedTTLDays) * 24 * time.Hour + feedMailbox, err := relay.OpenFeedMailbox(*feedDB, feedTTL) + if err != nil { + log.Fatalf("[NODE] feed mailbox: %v", err) + } + defer feedMailbox.Close() + log.Printf("[NODE] feed mailbox: %s (TTL %d days)", *feedDB, *feedTTLDays) + // Push-notify bus consumers whenever a fresh envelope lands in the // mailbox. Clients subscribed to `inbox:` (via WS) get the // event immediately so they no longer need to poll /relay/inbox. @@ -927,6 +938,16 @@ func main() { }, } + feedConfig := node.FeedConfig{ + Mailbox: feedMailbox, + HostingRelayPub: id.PubKeyHex(), + GetPost: chain.Post, + LikeCount: chain.LikeCount, + HasLiked: chain.HasLiked, + PostsByAuthor: chain.PostsByAuthor, + Following: chain.Following, + } + go func() { log.Printf("[NODE] stats API: http://0.0.0.0%s/stats", *statsAddr) if *disableUI { @@ -947,6 +968,7 @@ func main() { if err := stats.ListenAndServe(*statsAddr, statsQuery, func(mux *http.ServeMux) { node.RegisterExplorerRoutes(mux, explorerQuery, routeFlags) node.RegisterRelayRoutes(mux, relayConfig) + node.RegisterFeedRoutes(mux, feedConfig) // POST /api/governance/link — link deployed contracts at runtime. // Body: {"governance": ""} diff --git a/node/api_feed.go b/node/api_feed.go new file mode 100644 index 0000000..c4fbe54 --- /dev/null +++ b/node/api_feed.go @@ -0,0 +1,654 @@ +package node + +// Feed HTTP endpoints (v2.0.0). +// +// Mount points: +// +// POST /feed/publish — store a post body (authenticated) +// GET /feed/post/{id} — fetch a post body +// GET /feed/post/{id}/stats — {views, likes, liked_by_me?} aggregate +// POST /feed/post/{id}/view — increment off-chain view counter +// GET /feed/author/{pub} — ?limit=N, posts by an author +// GET /feed/timeline — ?follower=&limit=N, merged feed of follows +// GET /feed/trending — ?window=h&limit=N, top by likes + views +// GET /feed/foryou — ?pub=&limit=N, recommendations +// GET /feed/hashtag/{tag} — posts matching a hashtag +// +// Publish flow: +// 1. Client POSTs {content, attachment, post_id, author, sig, ts}. +// 2. Node verifies sig (Ed25519 over canonical bytes), hashes body, +// stores in FeedMailbox, returns hosting_relay + content_hash + size. +// 3. Client then submits on-chain CREATE_POST tx with that metadata. +// Node charges the fee (base + size×byte_fee) and credits the relay. +// 4. Subsequent GET /feed/post/{id} serves the stored body to anyone. +// +// Why the split? On-chain metadata gives us provable authorship + the +// pay-for-storage incentive; off-chain body storage keeps the block +// history small. If the hosting relay dies, the on-chain record stays +// (with a "body unavailable" fallback on the reader side) — authors can +// re-publish to another relay. + +import ( + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "sort" + "strings" + "time" + + "go-blockchain/blockchain" + "go-blockchain/identity" + "go-blockchain/relay" +) + +// FeedConfig wires feed HTTP endpoints to the relay mailbox and the +// chain for read-after-write queries. +type FeedConfig struct { + Mailbox *relay.FeedMailbox + + // HostingRelayPub is this node's Ed25519 pubkey — returned from + // /feed/publish so the client knows who to put in CREATE_POST tx. + HostingRelayPub string + + // Chain lookups (nil-safe; endpoints degrade gracefully). + GetPost func(postID string) (*blockchain.PostRecord, error) + LikeCount func(postID string) (uint64, error) + HasLiked func(postID, likerPub string) (bool, error) + PostsByAuthor func(authorPub string, limit int) ([]*blockchain.PostRecord, error) + Following func(followerPub string) ([]string, error) +} + +// RegisterFeedRoutes wires feed endpoints onto mux. Writes are rate-limited +// via withSubmitTxGuards; reads via withReadLimit (same limiters as /relay). +func RegisterFeedRoutes(mux *http.ServeMux, cfg FeedConfig) { + if cfg.Mailbox == nil { + return + } + mux.HandleFunc("/feed/publish", withSubmitTxGuards(feedPublish(cfg))) + mux.HandleFunc("/feed/post/", withReadLimit(feedPostRouter(cfg))) + mux.HandleFunc("/feed/author/", withReadLimit(feedAuthor(cfg))) + mux.HandleFunc("/feed/timeline", withReadLimit(feedTimeline(cfg))) + mux.HandleFunc("/feed/trending", withReadLimit(feedTrending(cfg))) + mux.HandleFunc("/feed/foryou", withReadLimit(feedForYou(cfg))) + mux.HandleFunc("/feed/hashtag/", withReadLimit(feedHashtag(cfg))) +} + +// ── POST /feed/publish ──────────────────────────────────────────────────── + +// feedPublishRequest — what the client sends. Signature is Ed25519 over +// canonical bytes: "publish:::". +// ts must be within ±5 minutes of server clock. +type feedPublishRequest struct { + PostID string `json:"post_id"` + Author string `json:"author"` // hex Ed25519 + Content string `json:"content"` + ContentType string `json:"content_type,omitempty"` + AttachmentB64 string `json:"attachment_b64,omitempty"` + AttachmentMIME string `json:"attachment_mime,omitempty"` + ReplyTo string `json:"reply_to,omitempty"` + QuoteOf string `json:"quote_of,omitempty"` + Sig string `json:"sig"` // base64 Ed25519 sig + Ts int64 `json:"ts"` +} + +type feedPublishResponse struct { + PostID string `json:"post_id"` + HostingRelay string `json:"hosting_relay"` + ContentHash string `json:"content_hash"` // hex sha256 + Size uint64 `json:"size"` + Hashtags []string `json:"hashtags"` + EstimatedFeeUT uint64 `json:"estimated_fee_ut"` // base + size*byte_fee +} + +func feedPublish(cfg FeedConfig) http.HandlerFunc { + const publishSkewSecs = 300 + + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + var req feedPublishRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + jsonErr(w, fmt.Errorf("invalid JSON: %w", err), 400) + return + } + if req.PostID == "" || req.Author == "" || req.Sig == "" || req.Ts == 0 { + jsonErr(w, fmt.Errorf("post_id, author, sig, ts are required"), 400) + return + } + if req.Content == "" && req.AttachmentB64 == "" { + jsonErr(w, fmt.Errorf("post must have content or attachment"), 400) + return + } + now := time.Now().Unix() + if req.Ts < now-publishSkewSecs || req.Ts > now+publishSkewSecs { + jsonErr(w, fmt.Errorf("ts out of range (±%ds)", publishSkewSecs), 400) + return + } + if req.ReplyTo != "" && req.QuoteOf != "" { + jsonErr(w, fmt.Errorf("reply_to and quote_of are mutually exclusive"), 400) + return + } + + // Decode attachment. + var attachment []byte + if req.AttachmentB64 != "" { + b, err := base64.StdEncoding.DecodeString(req.AttachmentB64) + if err != nil { + if b, err = base64.RawURLEncoding.DecodeString(req.AttachmentB64); err != nil { + jsonErr(w, fmt.Errorf("attachment_b64: invalid base64"), 400) + return + } + } + attachment = b + } + + // Content hash binds the body to the on-chain metadata. We hash + // content+attachment so the client can't publish body-A off-chain + // and commit hash-of-body-B on-chain. + h := sha256.New() + h.Write([]byte(req.Content)) + h.Write(attachment) + contentHash := h.Sum(nil) + contentHashHex := hex.EncodeToString(contentHash) + + // Verify the author's signature over the canonical publish bytes. + msg := []byte(fmt.Sprintf("publish:%s:%s:%d", req.PostID, contentHashHex, req.Ts)) + sigBytes, err := base64.StdEncoding.DecodeString(req.Sig) + if err != nil { + if sigBytes, err = base64.RawURLEncoding.DecodeString(req.Sig); err != nil { + jsonErr(w, fmt.Errorf("sig: invalid base64"), 400) + return + } + } + if _, err := hex.DecodeString(req.Author); err != nil { + jsonErr(w, fmt.Errorf("author: invalid hex"), 400) + return + } + ok, err := identity.Verify(req.Author, msg, sigBytes) + if err != nil || !ok { + jsonErr(w, fmt.Errorf("signature invalid"), 403) + return + } + + post := &relay.FeedPost{ + PostID: req.PostID, + Author: req.Author, + Content: req.Content, + ContentType: req.ContentType, + Attachment: attachment, + AttachmentMIME: req.AttachmentMIME, + ReplyTo: req.ReplyTo, + QuoteOf: req.QuoteOf, + } + hashtags, err := cfg.Mailbox.Store(post, req.Ts) + if err != nil { + if err == relay.ErrPostTooLarge { + jsonErr(w, err, 413) + return + } + jsonErr(w, err, 500) + return + } + + // Report what the client should put into CREATE_POST. + size := uint64(len(req.Content)) + uint64(len(attachment)) + 128 + fee := blockchain.BasePostFee + size*blockchain.PostByteFee + jsonOK(w, feedPublishResponse{ + PostID: req.PostID, + HostingRelay: cfg.HostingRelayPub, + ContentHash: contentHashHex, + Size: size, + Hashtags: hashtags, + EstimatedFeeUT: fee, + }) + } +} + +// ── GET /feed/post/{id} [+ /stats subroute, POST /view] ───────────────── + +// feedPostRouter dispatches /feed/post/{id}, /feed/post/{id}/stats, +// /feed/post/{id}/view to the right handler. +func feedPostRouter(cfg FeedConfig) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + rest := strings.TrimPrefix(r.URL.Path, "/feed/post/") + rest = strings.Trim(rest, "/") + if rest == "" { + jsonErr(w, fmt.Errorf("post id required"), 400) + return + } + parts := strings.Split(rest, "/") + postID := parts[0] + if len(parts) == 1 { + feedGetPost(cfg)(w, r, postID) + return + } + switch parts[1] { + case "stats": + feedPostStats(cfg)(w, r, postID) + case "view": + feedPostView(cfg)(w, r, postID) + default: + jsonErr(w, fmt.Errorf("unknown sub-route %q", parts[1]), 404) + } + } +} + +type postHandler func(w http.ResponseWriter, r *http.Request, postID string) + +func feedGetPost(cfg FeedConfig) postHandler { + return func(w http.ResponseWriter, r *http.Request, postID string) { + if r.Method != http.MethodGet { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + post, err := cfg.Mailbox.Get(postID) + if err != nil { + jsonErr(w, err, 500) + return + } + if post == nil { + jsonErr(w, fmt.Errorf("post %s not found", postID), 404) + return + } + // Respect on-chain soft-delete. + if cfg.GetPost != nil { + if rec, _ := cfg.GetPost(postID); rec != nil && rec.Deleted { + jsonErr(w, fmt.Errorf("post %s deleted", postID), 410) + return + } + } + jsonOK(w, post) + } +} + +type postStatsResponse struct { + PostID string `json:"post_id"` + Views uint64 `json:"views"` + Likes uint64 `json:"likes"` + LikedByMe *bool `json:"liked_by_me,omitempty"` // set only when ?me= given +} + +func feedPostStats(cfg FeedConfig) postHandler { + return func(w http.ResponseWriter, r *http.Request, postID string) { + if r.Method != http.MethodGet { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + views, _ := cfg.Mailbox.ViewCount(postID) + var likes uint64 + if cfg.LikeCount != nil { + likes, _ = cfg.LikeCount(postID) + } + resp := postStatsResponse{ + PostID: postID, + Views: views, + Likes: likes, + } + if me := r.URL.Query().Get("me"); me != "" && cfg.HasLiked != nil { + if liked, err := cfg.HasLiked(postID, me); err == nil { + resp.LikedByMe = &liked + } + } + jsonOK(w, resp) + } +} + +func feedPostView(cfg FeedConfig) postHandler { + return func(w http.ResponseWriter, r *http.Request, postID string) { + if r.Method != http.MethodPost { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + next, err := cfg.Mailbox.IncrementView(postID) + if err != nil { + jsonErr(w, err, 500) + return + } + jsonOK(w, map[string]any{ + "post_id": postID, + "views": next, + }) + } +} + +// ── GET /feed/author/{pub} ──────────────────────────────────────────────── + +func feedAuthor(cfg FeedConfig) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + pub := strings.TrimPrefix(r.URL.Path, "/feed/author/") + pub = strings.Trim(pub, "/") + if pub == "" { + jsonErr(w, fmt.Errorf("author pub required"), 400) + return + } + limit := queryInt(r, "limit", 50) + + // Prefer chain-authoritative list (includes soft-deleted flag) so + // clients can't be fooled by a stale relay that has an already- + // deleted post. If chain isn't wired, fall back to relay index. + if cfg.PostsByAuthor != nil { + records, err := cfg.PostsByAuthor(pub, limit) + if err != nil { + jsonErr(w, err, 500) + return + } + out := make([]feedAuthorItem, 0, len(records)) + for _, rec := range records { + if rec == nil || rec.Deleted { + continue + } + out = append(out, buildAuthorItem(cfg, rec)) + } + jsonOK(w, map[string]any{"author": pub, "count": len(out), "posts": out}) + return + } + ids, err := cfg.Mailbox.PostsByAuthor(pub, limit) + if err != nil { + jsonErr(w, err, 500) + return + } + out := expandByID(cfg, ids) + jsonOK(w, map[string]any{"author": pub, "count": len(out), "posts": out}) + } +} + +// feedAuthorItem is a chain record enriched with the body and live stats. +type feedAuthorItem struct { + PostID string `json:"post_id"` + Author string `json:"author"` + Content string `json:"content,omitempty"` + ContentType string `json:"content_type,omitempty"` + Hashtags []string `json:"hashtags,omitempty"` + ReplyTo string `json:"reply_to,omitempty"` + QuoteOf string `json:"quote_of,omitempty"` + CreatedAt int64 `json:"created_at"` + Size uint64 `json:"size"` + HostingRelay string `json:"hosting_relay"` + Views uint64 `json:"views"` + Likes uint64 `json:"likes"` + HasAttachment bool `json:"has_attachment"` +} + +func buildAuthorItem(cfg FeedConfig, rec *blockchain.PostRecord) feedAuthorItem { + item := feedAuthorItem{ + PostID: rec.PostID, + Author: rec.Author, + ReplyTo: rec.ReplyTo, + QuoteOf: rec.QuoteOf, + CreatedAt: rec.CreatedAt, + Size: rec.Size, + HostingRelay: rec.HostingRelay, + } + if body, _ := cfg.Mailbox.Get(rec.PostID); body != nil { + item.Content = body.Content + item.ContentType = body.ContentType + item.Hashtags = body.Hashtags + item.HasAttachment = len(body.Attachment) > 0 + } + if cfg.LikeCount != nil { + item.Likes, _ = cfg.LikeCount(rec.PostID) + } + item.Views, _ = cfg.Mailbox.ViewCount(rec.PostID) + return item +} + +// expandByID fetches bodies+stats for a list of post IDs (no chain record). +func expandByID(cfg FeedConfig, ids []string) []feedAuthorItem { + out := make([]feedAuthorItem, 0, len(ids)) + for _, id := range ids { + body, _ := cfg.Mailbox.Get(id) + if body == nil { + continue + } + item := feedAuthorItem{ + PostID: id, + Author: body.Author, + Content: body.Content, + ContentType: body.ContentType, + Hashtags: body.Hashtags, + ReplyTo: body.ReplyTo, + QuoteOf: body.QuoteOf, + CreatedAt: body.CreatedAt, + HasAttachment: len(body.Attachment) > 0, + } + if cfg.LikeCount != nil { + item.Likes, _ = cfg.LikeCount(id) + } + item.Views, _ = cfg.Mailbox.ViewCount(id) + out = append(out, item) + } + return out +} + +// ── GET /feed/timeline ──────────────────────────────────────────────────── + +func feedTimeline(cfg FeedConfig) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + follower := r.URL.Query().Get("follower") + if follower == "" { + jsonErr(w, fmt.Errorf("follower parameter required"), 400) + return + } + if cfg.Following == nil || cfg.PostsByAuthor == nil { + jsonErr(w, fmt.Errorf("timeline requires chain queries"), 503) + return + } + limit := queryInt(r, "limit", 50) + perAuthor := limit + if perAuthor > 30 { + perAuthor = 30 + } + + following, err := cfg.Following(follower) + if err != nil { + jsonErr(w, err, 500) + return + } + var merged []*blockchain.PostRecord + for _, target := range following { + posts, err := cfg.PostsByAuthor(target, perAuthor) + if err != nil { + continue + } + for _, p := range posts { + if p != nil && !p.Deleted { + merged = append(merged, p) + } + } + } + // Sort newest-first, take top N. + sort.Slice(merged, func(i, j int) bool { return merged[i].CreatedAt > merged[j].CreatedAt }) + if len(merged) > limit { + merged = merged[:limit] + } + out := make([]feedAuthorItem, 0, len(merged)) + for _, rec := range merged { + out = append(out, buildAuthorItem(cfg, rec)) + } + jsonOK(w, map[string]any{"count": len(out), "posts": out}) + } +} + +// ── GET /feed/trending ──────────────────────────────────────────────────── + +func feedTrending(cfg FeedConfig) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + limit := queryInt(r, "limit", 30) + // Window defaults to 24h; cap 7d so a viral post from a week ago + // doesn't permanently dominate. + windowHours := queryInt(r, "window", 24) + if windowHours > 24*7 { + windowHours = 24 * 7 + } + if windowHours < 1 { + windowHours = 1 + } + ids, err := cfg.Mailbox.RecentPostIDs(int64(windowHours)*3600, 500) + if err != nil { + jsonErr(w, err, 500) + return + } + // Score each = likes*3 + views, honoring soft-delete. + type scored struct { + id string + score uint64 + } + scoredList := make([]scored, 0, len(ids)) + for _, id := range ids { + if cfg.GetPost != nil { + if rec, _ := cfg.GetPost(id); rec != nil && rec.Deleted { + continue + } + } + views, _ := cfg.Mailbox.ViewCount(id) + var likes uint64 + if cfg.LikeCount != nil { + likes, _ = cfg.LikeCount(id) + } + scoredList = append(scoredList, scored{id: id, score: likes*3 + views}) + } + sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score }) + if len(scoredList) > limit { + scoredList = scoredList[:limit] + } + pickedIDs := make([]string, len(scoredList)) + for i, s := range scoredList { + pickedIDs[i] = s.id + } + out := expandByID(cfg, pickedIDs) + jsonOK(w, map[string]any{"count": len(out), "posts": out}) + } +} + +// ── GET /feed/foryou ────────────────────────────────────────────────────── +// +// Simple recommendations heuristic for v2.0.0: +// 1. Compute the set of authors the user already follows. +// 2. Fetch recent posts from the relay (last 48h). +// 3. Filter OUT posts from followed authors (those live in /timeline). +// 4. Filter OUT posts the user has already liked. +// 5. Rank remaining by (likes × 3 + views) and return top N. +// +// Future improvements (tracked as v2.2.0 "Feed algorithm"): +// - Weight by "followed-of-followed" signal (friends-of-friends boost). +// - Decay by age (exp half-life ~12h). +// - Penalise self-engagement (author liking own post). +// - Collaborative filtering on hashtag co-occurrence. + +func feedForYou(cfg FeedConfig) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + pub := r.URL.Query().Get("pub") + limit := queryInt(r, "limit", 30) + + // Gather user's follows + likes to exclude from the candidate pool. + excludedAuthors := make(map[string]struct{}) + if cfg.Following != nil && pub != "" { + if list, err := cfg.Following(pub); err == nil { + for _, a := range list { + excludedAuthors[a] = struct{}{} + } + } + } + // Post pool: last 48h on this relay. + ids, err := cfg.Mailbox.RecentPostIDs(48*3600, 500) + if err != nil { + jsonErr(w, err, 500) + return + } + type scored struct { + id string + score uint64 + } + scoredList := make([]scored, 0, len(ids)) + for _, id := range ids { + body, _ := cfg.Mailbox.Get(id) + if body == nil { + continue + } + if _, followed := excludedAuthors[body.Author]; followed { + continue + } + if body.Author == pub { + continue // don't recommend user's own posts + } + if cfg.GetPost != nil { + if rec, _ := cfg.GetPost(id); rec != nil && rec.Deleted { + continue + } + } + // Skip already-liked. + if cfg.HasLiked != nil && pub != "" { + if liked, _ := cfg.HasLiked(id, pub); liked { + continue + } + } + views, _ := cfg.Mailbox.ViewCount(id) + var likes uint64 + if cfg.LikeCount != nil { + likes, _ = cfg.LikeCount(id) + } + // Small "seed" score so posts with no engagement still get shown + // sometimes (otherwise a silent but fresh post can't break in). + scoredList = append(scoredList, scored{id: id, score: likes*3 + views + 1}) + } + sort.Slice(scoredList, func(i, j int) bool { return scoredList[i].score > scoredList[j].score }) + if len(scoredList) > limit { + scoredList = scoredList[:limit] + } + pickedIDs := make([]string, len(scoredList)) + for i, s := range scoredList { + pickedIDs[i] = s.id + } + out := expandByID(cfg, pickedIDs) + jsonOK(w, map[string]any{"count": len(out), "posts": out}) + } +} + +// ── GET /feed/hashtag/{tag} ────────────────────────────────────────────── + +func feedHashtag(cfg FeedConfig) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + jsonErr(w, fmt.Errorf("method not allowed"), 405) + return + } + tag := strings.TrimPrefix(r.URL.Path, "/feed/hashtag/") + tag = strings.Trim(tag, "/") + if tag == "" { + jsonErr(w, fmt.Errorf("tag required"), 400) + return + } + limit := queryInt(r, "limit", 50) + ids, err := cfg.Mailbox.PostsByHashtag(tag, limit) + if err != nil { + jsonErr(w, err, 500) + return + } + out := expandByID(cfg, ids) + jsonOK(w, map[string]any{"tag": strings.ToLower(tag), "count": len(out), "posts": out}) + } +} + +// (queryInt helper is shared with the rest of the node HTTP surface; +// see api_common.go.) diff --git a/relay/feed_mailbox.go b/relay/feed_mailbox.go new file mode 100644 index 0000000..9079b45 --- /dev/null +++ b/relay/feed_mailbox.go @@ -0,0 +1,431 @@ +package relay + +// FeedMailbox — BadgerDB-backed storage for social-feed post bodies. +// +// Posts are PUBLIC (plaintext) — unlike the E2E inbox envelopes, feed posts +// have no recipient key. They live keyed by post ID and can be read by +// anyone via GET /feed/post/{id}. +// +// Storage layout (keys): +// +// post: → FeedPost JSON (body + metadata) +// post-by-author:: → postID (chrono index for GET /feed/author) +// post-views: → uint64 big-endian (view counter) +// post-hashtag::: → postID (inverted index for #tag search) +// post-trending:: → postID (ranked index; score = likes × 2 + views) +// +// View counts are off-chain because on-chain would mean one tx per view — +// financially and architecturally unreasonable. Likes stay on-chain +// (provable authorship + anti-Sybil via fee). +// +// Anti-spam: +// - MaxPostBodySize is enforced at Store time. +// - Per-sender rate limiting happens at the HTTP layer (withSubmitTxGuards). + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "regexp" + "sort" + "strings" + "time" + + badger "github.com/dgraph-io/badger/v4" +) + +const ( + feedPostPrefix = "feedpost:" + feedAuthorIdxPrefix = "feedauthor:" // feedauthor::: + feedViewPrefix = "feedview:" // feedview: → uint64 + feedHashtagPrefix = "feedtag:" // feedtag::: + feedTrendingPrefix = "feedtrend:" // feedtrend:: + + // MaxPostBodySize is the hard cap on a post's on-wire size. Matches + // blockchain.MaxPostSize so the on-chain fee estimate is always + // enforceable (no "I claimed 10 KiB but sent 50 KiB" trick). + MaxPostBodySize = 256 * 1024 // 256 KiB + + // FeedPostDefaultTTLDays is how long a post body lives before BadgerDB + // auto-evicts it. On-chain metadata stays forever, so a reader hitting + // a stale post sees the record with a "body unavailable" indicator. + // Configurable via the env var DCHAIN_FEED_TTL_DAYS (handled in main.go). + FeedPostDefaultTTLDays = 30 + + // maxHashtagsPerPost caps how many distinct hashtags we'll index per + // post. Prevents a spammer from polluting every tag namespace with one + // mega-post. + maxHashtagsPerPost = 8 + + // trendingHalfLifeSeconds controls how quickly a post's score decays. + // Used when computing "trending": recent engagement weighs more than old. + trendingHalfLifeSeconds = 12 * 3600 // 12 hours +) + +// FeedPost is the off-chain body. On-chain we keep the metadata in +// blockchain.PostRecord — here we store the readable payload. +// +// Why not just put the body on-chain? Size — a 256 KiB post × thousands +// per day would bloat the block history. Keeping it in a relay DB with a +// TTL gives us ephemerality while still letting on-chain records serve as +// the permanent proof of authorship. +type FeedPost struct { + // Identity (matches on-chain PostRecord.PostID). + PostID string `json:"post_id"` + Author string `json:"author"` // Ed25519 hex + + // Payload. Content is always plaintext (posts are public). Attachment is + // a pre-compressed blob — client is expected to have minimised size + // before publish. If empty, the post is text-only. + Content string `json:"content"` + ContentType string `json:"content_type,omitempty"` // "text/plain" | "text/markdown" | ... + Attachment []byte `json:"attachment,omitempty"` + AttachmentMIME string `json:"attachment_mime,omitempty"` + Hashtags []string `json:"hashtags,omitempty"` // lowercased, without leading # + + // CreatedAt matches the on-chain tx timestamp — we stamp it server-side + // at Store() so senders can't back-date. + CreatedAt int64 `json:"created_at"` + + // ReplyTo / QuoteOf mirror the on-chain PostRecord fields, included + // here so the client can thread without a second RPC. + ReplyTo string `json:"reply_to,omitempty"` + QuoteOf string `json:"quote_of,omitempty"` +} + +// ErrPostTooLarge is returned by Store when the post body exceeds MaxPostBodySize. +var ErrPostTooLarge = errors.New("post body exceeds maximum allowed size") + +// FeedMailbox stores feed post bodies. +type FeedMailbox struct { + db *badger.DB + ttl time.Duration +} + +// NewFeedMailbox wraps an already-open Badger DB. TTL controls how long +// post bodies live before auto-eviction (on-chain metadata persists +// forever independently). +func NewFeedMailbox(db *badger.DB, ttl time.Duration) *FeedMailbox { + if ttl <= 0 { + ttl = time.Duration(FeedPostDefaultTTLDays) * 24 * time.Hour + } + return &FeedMailbox{db: db, ttl: ttl} +} + +// OpenFeedMailbox opens (or creates) a dedicated BadgerDB at dbPath. +func OpenFeedMailbox(dbPath string, ttl time.Duration) (*FeedMailbox, error) { + opts := badger.DefaultOptions(dbPath). + WithLogger(nil). + WithValueLogFileSize(128 << 20). + WithNumVersionsToKeep(1). + WithCompactL0OnClose(true) + db, err := badger.Open(opts) + if err != nil { + return nil, fmt.Errorf("open feed mailbox db: %w", err) + } + return NewFeedMailbox(db, ttl), nil +} + +// Close releases the underlying Badger handle. +func (fm *FeedMailbox) Close() error { return fm.db.Close() } + +// Store persists a post body and updates all indices. `createdAt` is the +// canonical timestamp (usually from the chain tx) and becomes the +// server's view of when the post happened — clients' wall-clock values +// are ignored. +// +// Returns the set of hashtags actually indexed (after dedup + cap). +func (fm *FeedMailbox) Store(post *FeedPost, createdAt int64) ([]string, error) { + size := estimatePostSize(post) + if size > MaxPostBodySize { + return nil, ErrPostTooLarge + } + + post.CreatedAt = createdAt + // Normalise hashtags — the client may or may not have supplied them; + // we derive from Content as the authoritative source, then dedup. + tags := extractHashtags(post.Content) + if len(tags) > maxHashtagsPerPost { + tags = tags[:maxHashtagsPerPost] + } + post.Hashtags = tags + + val, err := json.Marshal(post) + if err != nil { + return nil, fmt.Errorf("marshal post: %w", err) + } + + err = fm.db.Update(func(txn *badger.Txn) error { + // Idempotent on postID — second Store is a no-op. + key := []byte(feedPostPrefix + post.PostID) + if _, err := txn.Get(key); err == nil { + return nil + } + entry := badger.NewEntry(key, val).WithTTL(fm.ttl) + if err := txn.SetEntry(entry); err != nil { + return err + } + + // Author chrono index. + authorKey := fmt.Sprintf("%s%s:%020d:%s", feedAuthorIdxPrefix, post.Author, createdAt, post.PostID) + if err := txn.SetEntry( + badger.NewEntry([]byte(authorKey), []byte(post.PostID)).WithTTL(fm.ttl), + ); err != nil { + return err + } + + // Hashtag inverted index. + for _, tag := range tags { + tagKey := fmt.Sprintf("%s%s:%020d:%s", feedHashtagPrefix, tag, createdAt, post.PostID) + if err := txn.SetEntry( + badger.NewEntry([]byte(tagKey), []byte(post.PostID)).WithTTL(fm.ttl), + ); err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err + } + return tags, nil +} + +// Get returns the full post body, or nil if not found / evicted. +func (fm *FeedMailbox) Get(postID string) (*FeedPost, error) { + var p FeedPost + err := fm.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(feedPostPrefix + postID)) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &p) + }) + }) + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, nil + } + if err != nil { + return nil, err + } + return &p, nil +} + +// Delete removes a post body and its indices. On-chain soft-delete stays; +// this just frees storage. Called by DELETE_POST event handler hook. +func (fm *FeedMailbox) Delete(postID string) error { + // We need author and createdAt to build index keys — fetch first. + post, err := fm.Get(postID) + if err != nil { + return err + } + if post == nil { + return nil + } + return fm.db.Update(func(txn *badger.Txn) error { + if err := txn.Delete([]byte(feedPostPrefix + postID)); err != nil { + return err + } + authorKey := fmt.Sprintf("%s%s:%020d:%s", + feedAuthorIdxPrefix, post.Author, post.CreatedAt, postID) + if err := txn.Delete([]byte(authorKey)); err != nil { + return err + } + for _, tag := range post.Hashtags { + tagKey := fmt.Sprintf("%s%s:%020d:%s", + feedHashtagPrefix, tag, post.CreatedAt, postID) + if err := txn.Delete([]byte(tagKey)); err != nil { + return err + } + } + return nil + }) +} + +// IncrementView bumps the view counter for a post. No-op on missing post. +// Returns the new count. Views are ephemeral (tied to the post TTL) — +// a fresh relay that gossip-loads an old post starts from 0, which is +// acceptable for a non-authoritative metric. +func (fm *FeedMailbox) IncrementView(postID string) (uint64, error) { + var next uint64 + err := fm.db.Update(func(txn *badger.Txn) error { + key := []byte(feedViewPrefix + postID) + var cur uint64 + if item, err := txn.Get(key); err == nil { + _ = item.Value(func(val []byte) error { + if len(val) == 8 { + cur = binary.BigEndian.Uint64(val) + } + return nil + }) + } else if !errors.Is(err, badger.ErrKeyNotFound) { + return err + } + next = cur + 1 + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], next) + return txn.SetEntry(badger.NewEntry(key, buf[:]).WithTTL(fm.ttl)) + }) + return next, err +} + +// ViewCount returns the current (off-chain) view count for a post. +func (fm *FeedMailbox) ViewCount(postID string) (uint64, error) { + var n uint64 + err := fm.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(feedViewPrefix + postID)) + if errors.Is(err, badger.ErrKeyNotFound) { + return nil + } + if err != nil { + return err + } + return item.Value(func(val []byte) error { + if len(val) == 8 { + n = binary.BigEndian.Uint64(val) + } + return nil + }) + }) + return n, err +} + +// PostsByAuthor lists the N most recent post IDs by an author, newest first. +// Pure ID listing — callers fetch bodies via Get. +func (fm *FeedMailbox) PostsByAuthor(authorPub string, limit int) ([]string, error) { + if limit <= 0 || limit > 200 { + limit = 50 + } + prefix := []byte(feedAuthorIdxPrefix + authorPub + ":") + return fm.reverseIDScan(prefix, limit) +} + +// PostsByHashtag lists the N most recent posts tagged with tag (lowercased). +func (fm *FeedMailbox) PostsByHashtag(tag string, limit int) ([]string, error) { + tag = strings.ToLower(strings.TrimPrefix(tag, "#")) + if tag == "" { + return nil, nil + } + if limit <= 0 || limit > 200 { + limit = 50 + } + prefix := []byte(feedHashtagPrefix + tag + ":") + return fm.reverseIDScan(prefix, limit) +} + +// reverseIDScan walks prefix in reverse lex order and returns the value +// (postID) of each entry up to limit. Used for newest-first indices. +func (fm *FeedMailbox) reverseIDScan(prefix []byte, limit int) ([]string, error) { + out := make([]string, 0, limit) + err := fm.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + opts.Reverse = true + seek := append([]byte{}, prefix...) + seek = append(seek, 0xff) + it := txn.NewIterator(opts) + defer it.Close() + for it.Seek(seek); it.ValidForPrefix(prefix) && len(out) < limit; it.Next() { + item := it.Item() + _ = item.Value(func(val []byte) error { + out = append(out, string(val)) + return nil + }) + } + return nil + }) + return out, err +} + +// RecentPostIDs enumerates the most recent posts stored by this relay +// across ALL authors. Used by the trending / recommendations endpoints to +// seed the candidate pool. maxAgeSeconds bounds the walk (0 = no bound). +func (fm *FeedMailbox) RecentPostIDs(maxAgeSeconds int64, limit int) ([]string, error) { + if limit <= 0 || limit > 500 { + limit = 100 + } + // Can't reuse chrono indices because they're per-author. We scan post:* + // and collect, sorted by CreatedAt from the decoded body. This is O(M) + // where M = #posts in DB — fine for MVP since TTL-bounded M is small + // (~5k posts × 30d TTL on a busy node). + type candidate struct { + id string + ts int64 + } + cutoff := int64(0) + if maxAgeSeconds > 0 { + cutoff = time.Now().Unix() - maxAgeSeconds + } + var candidates []candidate + prefix := []byte(feedPostPrefix) + err := fm.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + it := txn.NewIterator(opts) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(val []byte) error { + var p FeedPost + if err := json.Unmarshal(val, &p); err != nil { + return nil // skip corrupt + } + if p.CreatedAt < cutoff { + return nil + } + candidates = append(candidates, candidate{id: p.PostID, ts: p.CreatedAt}) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err + } + sort.Slice(candidates, func(i, j int) bool { return candidates[i].ts > candidates[j].ts }) + if len(candidates) > limit { + candidates = candidates[:limit] + } + out := make([]string, len(candidates)) + for i, c := range candidates { + out[i] = c.id + } + return out, nil +} + +// extractHashtags finds #word tokens in text, lowercases, dedups, preserves +// first-seen order. Word = [A-Za-z0-9_] chars, length 1..40. +func extractHashtags(text string) []string { + re := hashtagRegex + matches := re.FindAllString(text, -1) + seen := make(map[string]struct{}, len(matches)) + out := make([]string, 0, len(matches)) + for _, m := range matches { + tag := strings.ToLower(strings.TrimPrefix(m, "#")) + if len(tag) == 0 || len(tag) > 40 { + continue + } + if _, ok := seen[tag]; ok { + continue + } + seen[tag] = struct{}{} + out = append(out, tag) + } + return out +} + +var hashtagRegex = regexp.MustCompile(`#[A-Za-z0-9_\p{L}]{1,40}`) + +// estimatePostSize returns the on-disk size used for fee calculation. +// Matches the client's pre-publish size estimate so fees are predictable. +func estimatePostSize(post *FeedPost) uint64 { + n := uint64(len(post.Content)) + uint64(len(post.Attachment)) + // Small overhead for metadata (~120 bytes of JSON scaffolding). + n += 128 + return n +} diff --git a/relay/feed_mailbox_test.go b/relay/feed_mailbox_test.go new file mode 100644 index 0000000..da0625d --- /dev/null +++ b/relay/feed_mailbox_test.go @@ -0,0 +1,198 @@ +package relay + +import ( + "os" + "testing" + "time" +) + +func newTestFeedMailbox(t *testing.T) *FeedMailbox { + t.Helper() + dir, err := os.MkdirTemp("", "dchain-feedtest-*") + if err != nil { + t.Fatalf("MkdirTemp: %v", err) + } + fm, err := OpenFeedMailbox(dir, 24*time.Hour) + if err != nil { + _ = os.RemoveAll(dir) + t.Fatalf("OpenFeedMailbox: %v", err) + } + t.Cleanup(func() { + _ = fm.Close() + for i := 0; i < 20; i++ { + if err := os.RemoveAll(dir); err == nil { + return + } + time.Sleep(10 * time.Millisecond) + } + }) + return fm +} + +// TestFeedMailboxStoreAndGet: store round-trips content + metadata. +func TestFeedMailboxStoreAndGet(t *testing.T) { + fm := newTestFeedMailbox(t) + post := &FeedPost{ + PostID: "p1", + Author: "authorhex", + Content: "Hello #world from #dchain", + } + tags, err := fm.Store(post, 12345) + if err != nil { + t.Fatalf("Store: %v", err) + } + wantTags := []string{"world", "dchain"} + if len(tags) != len(wantTags) { + t.Fatalf("Store returned %v, want %v", tags, wantTags) + } + for i := range wantTags { + if tags[i] != wantTags[i] { + t.Errorf("Store tag[%d]: got %q, want %q", i, tags[i], wantTags[i]) + } + } + + got, err := fm.Get("p1") + if err != nil || got == nil { + t.Fatalf("Get: got %v err=%v", got, err) + } + if got.Content != post.Content { + t.Errorf("content: got %q, want %q", got.Content, post.Content) + } + if got.CreatedAt != 12345 { + t.Errorf("created_at: got %d, want 12345", got.CreatedAt) + } + if len(got.Hashtags) != 2 { + t.Errorf("hashtags: got %v, want [world dchain]", got.Hashtags) + } +} + +// TestFeedMailboxTooLarge: rejects over-quota content. +func TestFeedMailboxTooLarge(t *testing.T) { + fm := newTestFeedMailbox(t) + big := make([]byte, MaxPostBodySize+1) + post := &FeedPost{ + PostID: "big1", + Author: "a", + Attachment: big, + } + if _, err := fm.Store(post, 0); err != ErrPostTooLarge { + t.Fatalf("Store huge post: got %v, want ErrPostTooLarge", err) + } +} + +// TestFeedMailboxHashtagIndex: hashtags are searchable + dedup + case-normalised. +func TestFeedMailboxHashtagIndex(t *testing.T) { + fm := newTestFeedMailbox(t) + + p1 := &FeedPost{PostID: "p1", Author: "a", Content: "post about #Go"} + p2 := &FeedPost{PostID: "p2", Author: "b", Content: "Also #go programming"} + p3 := &FeedPost{PostID: "p3", Author: "a", Content: "#Rust too"} + + if _, err := fm.Store(p1, 1000); err != nil { + t.Fatal(err) + } + if _, err := fm.Store(p2, 2000); err != nil { + t.Fatal(err) + } + if _, err := fm.Store(p3, 3000); err != nil { + t.Fatal(err) + } + + // #go is case-insensitive, should return both posts newest-first. + ids, err := fm.PostsByHashtag("#Go", 10) + if err != nil { + t.Fatal(err) + } + if len(ids) != 2 || ids[0] != "p2" || ids[1] != "p1" { + t.Errorf("PostsByHashtag(Go): got %v, want [p2 p1]", ids) + } + + ids, _ = fm.PostsByHashtag("rust", 10) + if len(ids) != 1 || ids[0] != "p3" { + t.Errorf("PostsByHashtag(rust): got %v, want [p3]", ids) + } +} + +// TestFeedMailboxViewCounter: increments + reads. +func TestFeedMailboxViewCounter(t *testing.T) { + fm := newTestFeedMailbox(t) + fm.Store(&FeedPost{PostID: "p", Author: "a", Content: "hi"}, 10) + + for i := 1; i <= 5; i++ { + n, err := fm.IncrementView("p") + if err != nil { + t.Fatal(err) + } + if n != uint64(i) { + t.Errorf("IncrementView #%d: got %d, want %d", i, n, i) + } + } + if n, _ := fm.ViewCount("p"); n != 5 { + t.Errorf("ViewCount: got %d, want 5", n) + } +} + +// TestFeedMailboxByAuthor: author chrono index returns newest first. +func TestFeedMailboxByAuthor(t *testing.T) { + fm := newTestFeedMailbox(t) + fm.Store(&FeedPost{PostID: "old", Author: "a", Content: "one"}, 100) + fm.Store(&FeedPost{PostID: "new", Author: "a", Content: "two"}, 500) + fm.Store(&FeedPost{PostID: "mid", Author: "a", Content: "three"}, 300) + fm.Store(&FeedPost{PostID: "other", Author: "b", Content: "four"}, 400) + + ids, err := fm.PostsByAuthor("a", 10) + if err != nil { + t.Fatal(err) + } + want := []string{"new", "mid", "old"} + if len(ids) != len(want) { + t.Fatalf("len: got %d, want %d (%v)", len(ids), len(want), ids) + } + for i := range want { + if ids[i] != want[i] { + t.Errorf("pos %d: got %q want %q", i, ids[i], want[i]) + } + } +} + +// TestFeedMailboxDelete: removes body + indices. +func TestFeedMailboxDelete(t *testing.T) { + fm := newTestFeedMailbox(t) + fm.Store(&FeedPost{PostID: "x", Author: "a", Content: "doomed #go"}, 100) + + if err := fm.Delete("x"); err != nil { + t.Fatalf("Delete: %v", err) + } + if got, _ := fm.Get("x"); got != nil { + t.Errorf("Get after delete: got %v, want nil", got) + } + if ids, _ := fm.PostsByHashtag("go", 10); len(ids) != 0 { + t.Errorf("hashtag index: got %v, want []", ids) + } + if ids, _ := fm.PostsByAuthor("a", 10); len(ids) != 0 { + t.Errorf("author index: got %v, want []", ids) + } +} + +// TestFeedMailboxRecentIDs: filters by window, sorts newest first. +func TestFeedMailboxRecentIDs(t *testing.T) { + fm := newTestFeedMailbox(t) + now := time.Now().Unix() + // p1 1 hour old, p2 5 hours old, p3 50 hours old. + fm.Store(&FeedPost{PostID: "p1", Author: "a", Content: "a"}, now-3600) + fm.Store(&FeedPost{PostID: "p2", Author: "b", Content: "b"}, now-5*3600) + fm.Store(&FeedPost{PostID: "p3", Author: "c", Content: "c"}, now-50*3600) + + // 6-hour window: p1 and p2 only. + ids, err := fm.RecentPostIDs(6*3600, 100) + if err != nil { + t.Fatal(err) + } + if len(ids) != 2 { + t.Errorf("RecentPostIDs(6h): got %v, want 2 posts", ids) + } + // Newest first. + if ids[0] != "p1" { + t.Errorf("first post: got %s, want p1", ids[0]) + } +}