diff --git a/apps/grpc/single/README.md b/apps/grpc/single/README.md index df1164124..27dfe0e2b 100644 --- a/apps/grpc/single/README.md +++ b/apps/grpc/single/README.md @@ -55,8 +55,7 @@ Start the Evolve node with: --root-dir ~/.grpc-single \ --grpc-executor-url http://localhost:50051 \ --da.address http://localhost:7980 \ - --da.auth-token your-da-token \ - --chain-id your-chain-id + --da.auth-token your-da-token ``` ## Command-Line Flags @@ -93,12 +92,11 @@ Start the Evolve node with: 3. Initialize and run the node: ```bash - ./grpc-single init --root-dir ~/.grpc-single + ./grpc-single init --root-dir ~/.grpc-single --chain-id test-chain ./grpc-single start \ --root-dir ~/.grpc-single \ --grpc-executor-url http://localhost:50051 \ - --da.address http://localhost:7980 \ - --chain-id test-chain + --da.address http://localhost:7980 ``` ## Architecture diff --git a/block/manager.go b/block/manager.go index 0fb06b2ca..d993f32b1 100644 --- a/block/manager.go +++ b/block/manager.go @@ -25,6 +25,7 @@ import ( "github.com/evstack/ev-node/pkg/cache" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/pkg/signer" storepkg "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" @@ -415,6 +416,16 @@ func NewManager( return nil, fmt.Errorf("failed to load cache: %w", err) } + // Initialize DA visualization server if enabled + if config.RPC.EnableDAVisualization { + daVisualizationServer := server.NewDAVisualizationServer(da, logger.With().Str("module", "da_visualization").Logger(), config.Node.Aggregator) + server.SetDAVisualizationServer(daVisualizationServer) + logger.Info().Msg("DA visualization server enabled") + } else { + // Ensure the global server is nil when disabled + server.SetDAVisualizationServer(nil) + } + return m, nil } diff --git a/block/submitter.go b/block/submitter.go index dd30fb3cb..b0c3a4aa4 100644 --- a/block/submitter.go +++ b/block/submitter.go @@ -6,6 +6,7 @@ import ( "time" coreda "github.com/evstack/ev-node/core/da" + "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/types" "google.golang.org/protobuf/proto" ) @@ -318,6 +319,12 @@ func handleSubmissionResult[T any]( case coreda.StatusContextCanceled: m.logger.Info().Int("attempt", retryStrategy.attempt).Msg("DA layer submission canceled due to context cancellation") + + // Record canceled submission in DA visualization server + if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { + daVisualizationServer.RecordSubmission(&res, retryStrategy.gasPrice, uint64(len(remaining))) + } + return submissionOutcome[T]{ RemainingItems: remaining, RemainingMarshal: marshaled, @@ -347,6 +354,11 @@ func handleSuccessfulSubmission[T any]( remLen := len(remaining) allSubmitted := res.SubmittedCount == uint64(remLen) + // Record submission in DA visualization server + if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { + daVisualizationServer.RecordSubmission(res, retryStrategy.gasPrice, res.SubmittedCount) + } + m.logger.Info().Str("itemType", itemType).Float64("gasPrice", retryStrategy.gasPrice).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer") submitted := remaining[:res.SubmittedCount] @@ -383,6 +395,11 @@ func handleMempoolFailure[T any]( m.recordDAMetrics("submission", DAModeFail) + // Record failed submission in DA visualization server + if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { + daVisualizationServer.RecordSubmission(res, retryStrategy.gasPrice, uint64(len(remaining))) + } + gasMultiplier := m.getGasMultiplier(ctx) retryStrategy.BackoffOnMempool(int(m.config.DA.MempoolTTL), m.config.DA.BlockTime.Duration, gasMultiplier) m.logger.Info().Dur("backoff", retryStrategy.backoff).Float64("gasPrice", retryStrategy.gasPrice).Msg("retrying DA layer submission with") @@ -409,6 +426,17 @@ func handleTooBigError[T any]( m.recordDAMetrics("submission", DAModeFail) + // Record failed submission in DA visualization server (create a result for TooBig error) + if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { + tooBigResult := &coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusTooBig, + Message: "blob too big", + }, + } + daVisualizationServer.RecordSubmission(tooBigResult, retryStrategy.gasPrice, uint64(len(remaining))) + } + if len(remaining) > 1 { totalSubmitted, err := submitWithRecursiveSplitting(m, ctx, remaining, marshaled, retryStrategy.gasPrice, postSubmit, itemType, namespace) if err != nil { @@ -462,6 +490,12 @@ func handleGenericFailure[T any]( m.logger.Error().Str("error", res.Message).Int("attempt", attempt).Msg("DA layer submission failed") m.recordDAMetrics("submission", DAModeFail) + + // Record failed submission in DA visualization server + if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { + daVisualizationServer.RecordSubmission(res, retryStrategy.gasPrice, uint64(len(remaining))) + } + retryStrategy.BackoffOnFailure() return submissionOutcome[T]{ @@ -662,12 +696,22 @@ func processBatch[T any]( postSubmit(submitted, &batchRes, gasPrice) m.logger.Info().Int("batchSize", len(batch.Items)).Uint64("submittedCount", batchRes.SubmittedCount).Msg("successfully submitted batch to DA layer") + // Record successful submission in DA visualization server + if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { + daVisualizationServer.RecordSubmission(&batchRes, gasPrice, batchRes.SubmittedCount) + } + return batchResult[T]{ action: batchActionSubmitted, submittedCount: int(batchRes.SubmittedCount), } } + // Record failed submission in DA visualization server for all error cases + if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { + daVisualizationServer.RecordSubmission(&batchRes, gasPrice, uint64(len(batch.Items))) + } + if batchRes.Code == coreda.StatusTooBig && len(batch.Items) > 1 { // Batch is too big - let the caller handle splitting m.logger.Debug().Int("batchSize", len(batch.Items)).Msg("batch too big, returning to caller for splitting") diff --git a/docs/guides/da-visualizer.md b/docs/guides/da-visualizer.md new file mode 100644 index 000000000..063e9735e --- /dev/null +++ b/docs/guides/da-visualizer.md @@ -0,0 +1,240 @@ +# DA Visualizer + +The Data Availability (DA) Visualizer is a built-in monitoring tool in Evolve that provides real-time insights into blob submissions to the DA layer. It offers a web-based interface for tracking submission statistics, monitoring DA layer health, and analyzing blob details. + +**Note**: Only aggregator nodes submit data to the DA layer. Non-aggregator nodes will not display submission data. + +## Overview + +The DA Visualizer provides: + +- Real-time monitoring of blob submissions (last 100 submissions) +- Success/failure statistics and trends +- Gas price tracking and cost analysis +- DA layer health monitoring +- Detailed blob inspection capabilities +- Recent submission history + +## Enabling the DA Visualizer + +The DA Visualizer is disabled by default. To enable it, use the following configuration: + +### Via Command-line Flag + +```bash +testapp start --rollkit.rpc.enable_da_visualization +``` + +### Via Configuration File + +Add the following to your `evolve.yaml` configuration file: + +```yaml +rpc: + enable_da_visualization: true +``` + +## Accessing the DA Visualizer + +Once enabled, the DA Visualizer is accessible through your node's RPC server. By default, this is: + +``` +http://localhost:7331/da +``` + +The visualizer provides several API endpoints and a web interface: + +### Web Interface + +Navigate to `http://localhost:7331/da` in your web browser to access the interactive dashboard. + +### API Endpoints + +The following REST API endpoints are available for programmatic access: + +#### Get Recent Submissions + +```bash +GET /da/submissions +``` + +Returns the most recent blob submissions (up to 100 kept in memory). + +#### Get Blob Details + +```bash +GET /da/blob?id={blob_id} +``` + +Returns detailed information about a specific blob submission. + +#### Get DA Statistics + +```bash +GET /da/stats +``` + +Returns aggregated statistics including: + +- Total submissions count +- Success/failure rates +- Average gas price +- Total gas spent +- Average blob size +- Submission trends + +#### Get DA Health Status + +```bash +GET /da/health +``` + +Returns the current health status of the DA layer including: + +- Connection status +- Recent error rates +- Performance metrics +- Last successful submission timestamp + +## Features + +### Real-time Monitoring + +The dashboard automatically updates every 30 seconds, displaying: + +- Recent submission feed with status indicators (last 100 submissions) +- Success rate percentage +- Current gas price trends +- Submission history + +### Submission Details + +Each submission entry shows: + +- Timestamp +- Blob ID with link to detailed view +- Number of blobs in the batch +- Submission status (success/failure) +- Gas price used +- Error messages (if any) + +### Statistics Dashboard + +The statistics section provides: + +- **Performance Metrics**: Success rate, average submission time +- **Cost Analysis**: Total gas spent, average gas price over time +- **Volume Metrics**: Total blobs submitted, average blob size +- **Trend Analysis**: Hourly and daily submission patterns + +### Health Monitoring + +The health status indicator shows: + +- 🟢 **Healthy**: DA layer responding normally +- 🟡 **Warning**: Some failures but overall functional +- 🔴 **Critical**: High failure rate or connection issues + +## Use Cases + +### For Node Operators + +- Monitor the reliability of DA submissions +- Track gas costs and optimize gas price settings +- Identify patterns in submission failures +- Ensure DA layer connectivity + +### For Developers + +- Debug DA submission issues +- Analyze blob data structure +- Monitor application-specific submission patterns +- Test DA layer integration + +### For Network Monitoring + +- Track overall network DA usage +- Identify congestion periods +- Monitor gas price fluctuations +- Analyze submission patterns across the network + +## Configuration Options + +When enabling the DA Visualizer, you may want to adjust related RPC settings: + +```yaml +rpc: + address: "0.0.0.0:7331" # Bind to all interfaces for remote access + enable_da_visualization: true +``` + +**Security Note**: If binding to all interfaces (`0.0.0.0`), ensure proper firewall rules are in place to restrict access to trusted sources only. + +## Troubleshooting + +### Visualizer Not Accessible + +1. Verify the DA Visualizer is enabled: + - Check your configuration file or ensure the flag is set + - Look for log entries confirming "DA visualization endpoints registered" + +2. Check the RPC server is running: + - Verify the RPC address in logs + - Ensure no port conflicts + +3. For remote access: + - Ensure the RPC server is bound to an accessible interface + - Check firewall settings + +### No Data Displayed + +1. Verify your node is in aggregator mode (only aggregators submit to DA) +2. Check DA layer connectivity in the node logs +3. Ensure transactions are being processed +4. Note that the visualizer only keeps the last 100 submissions in memory + +### API Errors + +- **404 Not Found**: DA Visualizer not enabled +- **500 Internal Server Error**: Check node logs for DA connection issues +- **Empty responses**: No submissions have been made yet + +## Example Usage + +### Using curl to access the API + +```bash +# Get recent submissions (returns up to 100) +curl http://localhost:7331/da/submissions + +# Get specific blob details +curl http://localhost:7331/da/blob?id=abc123... + +# Get statistics +curl http://localhost:7331/da/stats + +# Check DA health +curl http://localhost:7331/da/health +``` + +### Monitoring with scripts + +```bash +#!/bin/bash +# Simple monitoring script + +while true; do + health=$(curl -s http://localhost:7331/da/health | jq -r '.status') + if [ "$health" != "healthy" ]; then + echo "DA layer issue detected: $health" + # Send alert... + fi + sleep 30 +done +``` + +## Related Configuration + +For complete DA layer configuration options, see the [Config Reference](../learn/config.md#data-availability-configuration-da). + +For metrics and monitoring setup, see the [Metrics Guide](./metrics.md). diff --git a/docs/learn/config.md b/docs/learn/config.md index c72abb3f2..289090f1f 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -35,6 +35,7 @@ This document provides a comprehensive reference for all configuration options a - [P2P Allowed Peers](#p2p-allowed-peers) - [RPC Configuration (`rpc`)](#rpc-configuration-rpc) - [RPC Server Address](#rpc-server-address) + - [Enable DA Visualization](#enable-da-visualization) - [Instrumentation Configuration (`instrumentation`)](#instrumentation-configuration-instrumentation) - [Enable Prometheus Metrics](#enable-prometheus-metrics) - [Prometheus Listen Address](#prometheus-listen-address) @@ -582,6 +583,26 @@ rpc: *Default:* `"127.0.0.1:7331"` *Constant:* `FlagRPCAddress` +### Enable DA Visualization + +**Description:** +If true, enables the Data Availability (DA) visualization endpoints that provide real-time monitoring of blob submissions to the DA layer. This includes a web-based dashboard and REST API endpoints for tracking submission statistics, monitoring DA health, and analyzing blob details. Only aggregator nodes submit data to the DA layer, so this feature is most useful when running in aggregator mode. + +**YAML:** + +```yaml +rpc: + enable_da_visualization: true +``` + +**Command-line Flag:** +`--rollkit.rpc.enable_da_visualization` (boolean, presence enables it) +*Example:* `--rollkit.rpc.enable_da_visualization` +*Default:* `false` +*Constant:* `FlagRPCEnableDAVisualization` + +See the [DA Visualizer Guide](/guides/da-visualizer.md) for detailed information on using this feature. + ## Instrumentation Configuration (`instrumentation`) Settings for enabling and configuring metrics and profiling endpoints, useful for monitoring node performance and debugging. diff --git a/docs/learn/specs/block-manager.md b/docs/learn/specs/block-manager.md index 1eed52c85..8ccbfba65 100644 --- a/docs/learn/specs/block-manager.md +++ b/docs/learn/specs/block-manager.md @@ -267,23 +267,23 @@ flowchart TD B -->|Mempool/Not Included| E[Mempool Backoff Strategy] B -->|Context Canceled| F[Stop Submission] B -->|Other Error| G[Exponential Backoff] - + D -->|Yes| H[Recursive Batch Splitting] D -->|No| I[Skip Single Item - Cannot Split] - + E --> J[Set Backoff = MempoolTTL * BlockTime] E --> K[Multiply Gas Price by GasMultiplier] - + G --> L[Double Backoff Time] G --> M[Cap at MaxBackoff - BlockTime] - + H --> N[Split into Two Halves] N --> O[Submit First Half] O --> P[Submit Second Half] P --> Q{Both Halves Processed?} Q -->|Yes| R[Combine Results] Q -->|No| S[Handle Partial Success] - + C --> T[Update Pending Queues] T --> U[Post-Submit Actions] ``` @@ -295,7 +295,7 @@ flowchart TD * Exponential backoff for general failures (doubles each attempt, capped at `BlockTime`) * Mempool-specific backoff (waits `MempoolTTL * BlockTime` for stuck transactions) * Success-based backoff reset with gas price reduction -* **Gas Price Management**: +* **Gas Price Management**: * Increases gas price by `GasMultiplier` on mempool failures * Decreases gas price after successful submissions (bounded by initial price) * Supports automatic gas price detection (`-1` value) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index eb6e7c12b..49719bd5e 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -96,9 +96,9 @@ func StartNode( return err } + // Resolve signer path relative to root directory if it's not an absolute path signerPath := nodeConfig.Signer.SignerPath if !filepath.IsAbs(signerPath) { - // Resolve relative signer path relative to root directory signerPath = filepath.Join(nodeConfig.RootDir, signerPath) } signer, err = file.LoadFileSystemSigner(signerPath, []byte(passphrase)) diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index ed037dc41..449a6ab1d 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -99,12 +99,15 @@ func TestParseFlags(t *testing.T) { t.Errorf("Error: %v", err) } + // Convert relative path to absolute for comparison + expectedRootDir, _ := filepath.Abs("custom/root/dir") + testCases := []struct { name string got any expected any }{ - {"RootDir", nodeConfig.RootDir, "custom/root/dir"}, + {"RootDir", nodeConfig.RootDir, expectedRootDir}, {"DBPath", nodeConfig.DBPath, "custom/db/path"}, // P2P fields diff --git a/pkg/config/config.go b/pkg/config/config.go index 853dc0467..e013f23bc 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -116,6 +116,8 @@ const ( // FlagRPCAddress is a flag for specifying the RPC server address FlagRPCAddress = "rollkit.rpc.address" + // FlagRPCEnableDAVisualization is a flag for enabling DA visualization endpoints + FlagRPCEnableDAVisualization = "rollkit.rpc.enable_da_visualization" ) // Config stores Rollkit configuration. @@ -222,7 +224,8 @@ type SignerConfig struct { // RPCConfig contains all RPC server configuration parameters type RPCConfig struct { - Address string `mapstructure:"address" yaml:"address" comment:"Address to bind the RPC server to (host:port). Default: 127.0.0.1:7331"` + Address string `mapstructure:"address" yaml:"address" comment:"Address to bind the RPC server to (host:port). Default: 127.0.0.1:7331"` + EnableDAVisualization bool `mapstructure:"enable_da_visualization" yaml:"enable_da_visualization" comment:"Enable DA visualization endpoints for monitoring blob submissions. Default: false"` } // Validate ensures that the root directory exists. @@ -293,6 +296,7 @@ func AddFlags(cmd *cobra.Command) { // RPC configuration flags cmd.Flags().String(FlagRPCAddress, def.RPC.Address, "RPC server address (host:port)") + cmd.Flags().Bool(FlagRPCEnableDAVisualization, def.RPC.EnableDAVisualization, "enable DA visualization endpoints for monitoring blob submissions") // Instrumentation configuration flags instrDef := DefaultInstrumentationConfig() @@ -316,6 +320,13 @@ func Load(cmd *cobra.Command) (Config, error) { home, _ := cmd.Flags().GetString(FlagRootDir) if home == "" { home = DefaultRootDir + } else if !filepath.IsAbs(home) { + // Convert relative path to absolute path + absHome, err := filepath.Abs(home) + if err != nil { + return Config{}, fmt.Errorf("failed to resolve home directory: %w", err) + } + home = absHome } v := viper.New() @@ -356,6 +367,13 @@ func LoadFromViper(inputViper *viper.Viper) (Config, error) { home := inputViper.GetString(FlagRootDir) if home == "" { home = DefaultRootDir + } else if !filepath.IsAbs(home) { + // Convert relative path to absolute path + absHome, err := filepath.Abs(home) + if err != nil { + return Config{}, fmt.Errorf("failed to resolve home directory: %w", err) + } + home = absHome } // create a new viper instance for reading the config file diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 233ade26a..884166f7d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -101,7 +101,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig.RPC.Address) // Count the number of flags we're explicitly checking - expectedFlagCount := 37 // Update this number if you add more flag checks above + expectedFlagCount := 38 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/rpc/server/da_visualization.go b/pkg/rpc/server/da_visualization.go new file mode 100644 index 000000000..3d48362c0 --- /dev/null +++ b/pkg/rpc/server/da_visualization.go @@ -0,0 +1,517 @@ +package server + +import ( + "context" + _ "embed" + "encoding/hex" + "encoding/json" + "fmt" + "html/template" + "net/http" + "sync" + "time" + + coreda "github.com/evstack/ev-node/core/da" + "github.com/rs/zerolog" +) + +//go:embed templates/da_visualization.html +var daVisualizationHTML string + +// DASubmissionInfo represents information about a DA submission +type DASubmissionInfo struct { + ID string `json:"id"` + Height uint64 `json:"height"` + BlobSize uint64 `json:"blob_size"` + Timestamp time.Time `json:"timestamp"` + GasPrice float64 `json:"gas_price"` + StatusCode string `json:"status_code"` + Message string `json:"message,omitempty"` + NumBlobs uint64 `json:"num_blobs"` + BlobIDs []string `json:"blob_ids,omitempty"` +} + +// DAVisualizationServer provides DA layer visualization endpoints +type DAVisualizationServer struct { + da coreda.DA + logger zerolog.Logger + submissions []DASubmissionInfo + mutex sync.RWMutex + isAggregator bool +} + +// NewDAVisualizationServer creates a new DA visualization server +func NewDAVisualizationServer(da coreda.DA, logger zerolog.Logger, isAggregator bool) *DAVisualizationServer { + return &DAVisualizationServer{ + da: da, + logger: logger, + submissions: make([]DASubmissionInfo, 0), + isAggregator: isAggregator, + } +} + +// RecordSubmission records a DA submission for visualization +// Only keeps the last 100 submissions in memory for the dashboard display +func (s *DAVisualizationServer) RecordSubmission(result *coreda.ResultSubmit, gasPrice float64, numBlobs uint64) { + s.mutex.Lock() + defer s.mutex.Unlock() + + statusCode := s.getStatusCodeString(result.Code) + blobIDs := make([]string, len(result.IDs)) + for i, id := range result.IDs { + blobIDs[i] = hex.EncodeToString(id) + } + + submission := DASubmissionInfo{ + ID: fmt.Sprintf("submission_%d_%d", result.Height, time.Now().Unix()), + Height: result.Height, + BlobSize: result.BlobSize, + Timestamp: result.Timestamp, + GasPrice: gasPrice, + StatusCode: statusCode, + Message: result.Message, + NumBlobs: numBlobs, + BlobIDs: blobIDs, + } + + // Keep only the last 100 submissions in memory to avoid memory growth + // The HTML dashboard shows these recent submissions only + s.submissions = append(s.submissions, submission) + if len(s.submissions) > 100 { + s.submissions = s.submissions[1:] + } +} + +// getStatusCodeString converts status code to human-readable string +func (s *DAVisualizationServer) getStatusCodeString(code coreda.StatusCode) string { + switch code { + case coreda.StatusSuccess: + return "Success" + case coreda.StatusNotFound: + return "Not Found" + case coreda.StatusNotIncludedInBlock: + return "Not Included In Block" + case coreda.StatusAlreadyInMempool: + return "Already In Mempool" + case coreda.StatusTooBig: + return "Too Big" + case coreda.StatusContextDeadline: + return "Context Deadline" + case coreda.StatusError: + return "Error" + case coreda.StatusIncorrectAccountSequence: + return "Incorrect Account Sequence" + case coreda.StatusContextCanceled: + return "Context Canceled" + case coreda.StatusHeightFromFuture: + return "Height From Future" + default: + return "Unknown" + } +} + +// handleDASubmissions returns JSON list of recent DA submissions +// Note: This returns only the most recent submissions kept in memory (max 100) +func (s *DAVisualizationServer) handleDASubmissions(w http.ResponseWriter, r *http.Request) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + // If not an aggregator, return empty submissions with a message + if !s.isAggregator { + response := map[string]interface{}{ + "is_aggregator": false, + "submissions": []DASubmissionInfo{}, + "total": 0, + "message": "This node is not an aggregator and does not submit to the DA layer", + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + s.logger.Error().Err(err).Msg("Failed to encode DA submissions response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } + return + } + + // Reverse the slice to show newest first + reversed := make([]DASubmissionInfo, len(s.submissions)) + for i, j := 0, len(s.submissions)-1; j >= 0; i, j = i+1, j-1 { + reversed[i] = s.submissions[j] + } + + // Build response + response := map[string]interface{}{ + "submissions": reversed, + "total": len(reversed), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + s.logger.Error().Err(err).Msg("Failed to encode DA submissions response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } +} + +// handleDABlobDetails returns details about a specific blob +func (s *DAVisualizationServer) handleDABlobDetails(w http.ResponseWriter, r *http.Request) { + blobID := r.URL.Query().Get("id") + if blobID == "" { + http.Error(w, "Missing blob ID parameter", http.StatusBadRequest) + return + } + + // Decode the hex blob ID + id, err := hex.DecodeString(blobID) + if err != nil { + http.Error(w, "Invalid blob ID format", http.StatusBadRequest) + return + } + + // Try to retrieve blob from DA layer + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + // Extract namespace - using empty namespace for now, could be parameterized + namespace := []byte{} + blobs, err := s.da.Get(ctx, []coreda.ID{id}, namespace) + if err != nil { + s.logger.Error().Err(err).Str("blob_id", blobID).Msg("Failed to retrieve blob from DA") + http.Error(w, fmt.Sprintf("Failed to retrieve blob: %v", err), http.StatusInternalServerError) + return + } + + if len(blobs) == 0 { + http.Error(w, "Blob not found", http.StatusNotFound) + return + } + + // Parse the blob ID to extract height and commitment + height, commitment, err := coreda.SplitID(id) + if err != nil { + s.logger.Error().Err(err).Str("blob_id", blobID).Msg("Failed to split blob ID") + } + + blob := blobs[0] + response := map[string]interface{}{ + "id": blobID, + "height": height, + "commitment": hex.EncodeToString(commitment), + "size": len(blob), + "content": hex.EncodeToString(blob), + "content_preview": string(blob[:min(len(blob), 200)]), // First 200 bytes as string + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + s.logger.Error().Err(err).Msg("Failed to encode blob details response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } +} + +// handleDAStats returns aggregated statistics about DA submissions +func (s *DAVisualizationServer) handleDAStats(w http.ResponseWriter, r *http.Request) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + // If not an aggregator, return empty stats + if !s.isAggregator { + stats := map[string]interface{}{ + "is_aggregator": false, + "total_submissions": 0, + "message": "This node is not an aggregator and does not submit to the DA layer", + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(stats); err != nil { + s.logger.Error().Err(err).Msg("Failed to encode DA stats response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } + return + } + + // Calculate statistics + var ( + totalSubmissions = len(s.submissions) + successCount int + errorCount int + totalBlobSize uint64 + totalGasPrice float64 + avgBlobSize float64 + avgGasPrice float64 + successRate float64 + ) + + for _, submission := range s.submissions { + switch submission.StatusCode { + case "Success": + successCount++ + case "Error": + errorCount++ + } + totalBlobSize += submission.BlobSize + totalGasPrice += submission.GasPrice + } + + if totalSubmissions > 0 { + avgBlobSize = float64(totalBlobSize) / float64(totalSubmissions) + avgGasPrice = totalGasPrice / float64(totalSubmissions) + successRate = float64(successCount) / float64(totalSubmissions) * 100 + } + + // Get time range + var firstSubmission, lastSubmission *time.Time + if totalSubmissions > 0 { + firstSubmission = &s.submissions[0].Timestamp + lastSubmission = &s.submissions[len(s.submissions)-1].Timestamp + } + + stats := map[string]interface{}{ + "total_submissions": totalSubmissions, + "success_count": successCount, + "error_count": errorCount, + "success_rate": fmt.Sprintf("%.2f%%", successRate), + "total_blob_size": totalBlobSize, + "avg_blob_size": avgBlobSize, + "avg_gas_price": avgGasPrice, + "time_range": map[string]interface{}{ + "first": firstSubmission, + "last": lastSubmission, + }, + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(stats); err != nil { + s.logger.Error().Err(err).Msg("Failed to encode DA stats response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } +} + +// handleDAHealth returns health status of the DA layer connection +func (s *DAVisualizationServer) handleDAHealth(w http.ResponseWriter, r *http.Request) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + // If not an aggregator, return simplified health status + if !s.isAggregator { + health := map[string]interface{}{ + "is_aggregator": false, + "status": "n/a", + "message": "This node is not an aggregator and does not submit to the DA layer", + "connection_status": "n/a", + "timestamp": time.Now(), + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(health); err != nil { + s.logger.Error().Err(err).Msg("Failed to encode DA health response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } + return + } + + // Calculate health metrics + var ( + lastSuccessTime *time.Time + lastErrorTime *time.Time + recentErrors int + recentSuccesses int + lastSubmissionTime *time.Time + errorRate float64 + isHealthy bool + healthStatus string + healthIssues []string + ) + + // Look at recent submissions (last 10 or all if less) + recentCount := 10 + if len(s.submissions) < recentCount { + recentCount = len(s.submissions) + } + + // Analyze recent submissions + for i := len(s.submissions) - recentCount; i < len(s.submissions); i++ { + if i < 0 { + continue + } + submission := s.submissions[i] + switch submission.StatusCode { + case "Success": + recentSuccesses++ + if lastSuccessTime == nil || submission.Timestamp.After(*lastSuccessTime) { + lastSuccessTime = &submission.Timestamp + } + case "Error": + recentErrors++ + if lastErrorTime == nil || submission.Timestamp.After(*lastErrorTime) { + lastErrorTime = &submission.Timestamp + } + } + } + + // Get the last submission time + if len(s.submissions) > 0 { + lastSubmissionTime = &s.submissions[len(s.submissions)-1].Timestamp + } + + // Calculate error rate for recent submissions + if recentCount > 0 { + errorRate = float64(recentErrors) / float64(recentCount) * 100 + } + + // Determine health status based on criteria + isHealthy = true + healthStatus = "healthy" + + // Check error rate threshold (>20% is unhealthy) + if errorRate > 20 { + isHealthy = false + healthStatus = "degraded" + healthIssues = append(healthIssues, fmt.Sprintf("High error rate: %.1f%%", errorRate)) + } + + // Check if we haven't had a successful submission in the last 5 minutes + if lastSuccessTime != nil && time.Since(*lastSuccessTime) > 5*time.Minute { + isHealthy = false + healthStatus = "unhealthy" + healthIssues = append(healthIssues, fmt.Sprintf("No successful submissions for %v", time.Since(*lastSuccessTime).Round(time.Second))) + } + + // Check if DA layer appears to be stalled (no submissions at all in last 2 minutes) + if lastSubmissionTime != nil && time.Since(*lastSubmissionTime) > 2*time.Minute { + healthStatus = "warning" + healthIssues = append(healthIssues, fmt.Sprintf("No submissions for %v", time.Since(*lastSubmissionTime).Round(time.Second))) + } + + // If no submissions at all + if len(s.submissions) == 0 { + healthStatus = "unknown" + healthIssues = append(healthIssues, "No submissions recorded yet") + } + + // Test DA layer connectivity (attempt a simple operation) + var connectionStatus string + connectionHealthy := false + + // Try to validate the DA layer is responsive + ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second) + defer cancel() + + // Check if DA layer responds to basic operations + // This is a non-invasive check - we're just testing responsiveness + select { + case <-ctx.Done(): + connectionStatus = "timeout" + default: + // DA layer is at least instantiated + if s.da != nil { + connectionStatus = "connected" + connectionHealthy = true + } else { + connectionStatus = "disconnected" + isHealthy = false + healthStatus = "unhealthy" + healthIssues = append(healthIssues, "DA layer not initialized") + } + } + + health := map[string]interface{}{ + "status": healthStatus, + "is_healthy": isHealthy, + "connection_status": connectionStatus, + "connection_healthy": connectionHealthy, + "metrics": map[string]interface{}{ + "recent_error_rate": fmt.Sprintf("%.1f%%", errorRate), + "recent_errors": recentErrors, + "recent_successes": recentSuccesses, + "recent_sample_size": recentCount, + "total_submissions": len(s.submissions), + "last_submission_time": lastSubmissionTime, + "last_success_time": lastSuccessTime, + "last_error_time": lastErrorTime, + }, + "issues": healthIssues, + "timestamp": time.Now(), + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(health); err != nil { + s.logger.Error().Err(err).Msg("Failed to encode DA health response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } +} + +// handleDAVisualizationHTML returns HTML visualization page +func (s *DAVisualizationServer) handleDAVisualizationHTML(w http.ResponseWriter, r *http.Request) { + s.mutex.RLock() + submissions := make([]DASubmissionInfo, len(s.submissions)) + copy(submissions, s.submissions) + s.mutex.RUnlock() + + // Reverse the slice to show newest first + for i, j := 0, len(submissions)-1; i < j; i, j = i+1, j-1 { + submissions[i], submissions[j] = submissions[j], submissions[i] + } + + t, err := template.New("da").Funcs(template.FuncMap{ + "slice": func(s string, start, end int) string { + if end > len(s) { + end = len(s) + } + return s[start:end] + }, + "len": func(items interface{}) int { + // Handle different types gracefully + switch v := items.(type) { + case []DASubmissionInfo: + return len(v) + case struct { + Submissions []DASubmissionInfo + LastUpdate string + IsAggregator bool + }: + return len(v.Submissions) + default: + return 0 + } + }, + }).Parse(daVisualizationHTML) + + if err != nil { + s.logger.Error().Err(err).Msg("Failed to parse template") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Create template data with LastUpdate + data := struct { + Submissions []DASubmissionInfo + LastUpdate string + IsAggregator bool + }{ + Submissions: submissions, + LastUpdate: time.Now().Format("15:04:05"), + IsAggregator: s.isAggregator, + } + + w.Header().Set("Content-Type", "text/html") + if err := t.Execute(w, data); err != nil { + s.logger.Error().Err(err).Msg("Failed to execute template") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } +} + +// Global DA visualization server instance +var daVisualizationServer *DAVisualizationServer +var daVisualizationMutex sync.Mutex + +// SetDAVisualizationServer sets the global DA visualization server instance +func SetDAVisualizationServer(server *DAVisualizationServer) { + daVisualizationMutex.Lock() + defer daVisualizationMutex.Unlock() + daVisualizationServer = server +} + +// GetDAVisualizationServer returns the global DA visualization server instance +func GetDAVisualizationServer() *DAVisualizationServer { + daVisualizationMutex.Lock() + defer daVisualizationMutex.Unlock() + return daVisualizationServer +} diff --git a/pkg/rpc/server/da_visualization_non_aggregator_test.go b/pkg/rpc/server/da_visualization_non_aggregator_test.go new file mode 100644 index 000000000..1d197aadc --- /dev/null +++ b/pkg/rpc/server/da_visualization_non_aggregator_test.go @@ -0,0 +1,170 @@ +package server + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/evstack/ev-node/test/mocks" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNonAggregatorDAVisualizationServer(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + + // Create a non-aggregator server + server := NewDAVisualizationServer(da, logger, false) + + assert.NotNil(t, server) + assert.Equal(t, da, server.da) + assert.Equal(t, 0, len(server.submissions)) + assert.False(t, server.isAggregator) +} + +func TestNonAggregatorHandleDASubmissions(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + + // Create a non-aggregator server + server := NewDAVisualizationServer(da, logger, false) + + // Create test request + req, err := http.NewRequest("GET", "/da/submissions", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDASubmissions(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) + + var response map[string]interface{} + err = json.Unmarshal(rr.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Equal(t, false, response["is_aggregator"]) + assert.Equal(t, float64(0), response["total"]) + assert.Contains(t, response["message"], "not an aggregator") + + submissions, ok := response["submissions"].([]interface{}) + require.True(t, ok) + assert.Equal(t, 0, len(submissions)) +} + +func TestNonAggregatorHandleDAStats(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + + // Create a non-aggregator server + server := NewDAVisualizationServer(da, logger, false) + + // Create test request + req, err := http.NewRequest("GET", "/da/stats", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDAStats(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) + + var response map[string]interface{} + err = json.Unmarshal(rr.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Equal(t, false, response["is_aggregator"]) + assert.Equal(t, float64(0), response["total_submissions"]) + assert.Contains(t, response["message"], "not an aggregator") +} + +func TestNonAggregatorHandleDAHealth(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + + // Create a non-aggregator server + server := NewDAVisualizationServer(da, logger, false) + + // Create test request + req, err := http.NewRequest("GET", "/da/health", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDAHealth(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) + + var response map[string]interface{} + err = json.Unmarshal(rr.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Equal(t, false, response["is_aggregator"]) + assert.Equal(t, "n/a", response["status"]) + assert.Equal(t, "n/a", response["connection_status"]) + assert.Contains(t, response["message"], "not an aggregator") +} + +func TestNonAggregatorHandleDAVisualizationHTML(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + + // Create a non-aggregator server + server := NewDAVisualizationServer(da, logger, false) + + req, err := http.NewRequest("GET", "/da", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDAVisualizationHTML(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "text/html", rr.Header().Get("Content-Type")) + + body := rr.Body.String() + assert.Contains(t, body, "DA Layer Visualization") + assert.Contains(t, body, "Non-aggregator") + assert.Contains(t, body, "does not submit data to the DA layer") + + // Should not show API endpoints for non-aggregator + assert.NotContains(t, body, "Available API Endpoints") + + // Should not show recent submissions table + assert.NotContains(t, body, "
Timestamp | ") +} + +func TestAggregatorWithNoSubmissionsHTML(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + + // Create an aggregator server but don't add any submissions + server := NewDAVisualizationServer(da, logger, true) + + req, err := http.NewRequest("GET", "/da", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDAVisualizationHTML(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "text/html", rr.Header().Get("Content-Type")) + + body := rr.Body.String() + assert.Contains(t, body, "DA Layer Visualization") + + // Should show API endpoints for aggregator + assert.Contains(t, body, "Available API Endpoints") + + // Should show message about no submissions yet + assert.Contains(t, body, "No submissions recorded yet") + assert.Contains(t, body, "This aggregator node has not submitted any data") + + // Should not show non-aggregator message + assert.NotContains(t, body, "Non-aggregator") + assert.NotContains(t, strings.ToLower(body), "non-aggregator nodes do not submit") +} diff --git a/pkg/rpc/server/da_visualization_test.go b/pkg/rpc/server/da_visualization_test.go new file mode 100644 index 000000000..4bef68d49 --- /dev/null +++ b/pkg/rpc/server/da_visualization_test.go @@ -0,0 +1,309 @@ +package server + +import ( + "encoding/hex" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + coreda "github.com/evstack/ev-node/core/da" + "github.com/evstack/ev-node/test/mocks" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewDAVisualizationServer(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + + server := NewDAVisualizationServer(da, logger, true) + + assert.NotNil(t, server) + assert.Equal(t, da, server.da) + assert.Equal(t, 0, len(server.submissions)) +} + +func TestRecordSubmission(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + // Test recording a successful submission + result := &coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusSuccess, + Height: 100, + BlobSize: 1024, + Timestamp: time.Now(), + Message: "Success", + IDs: [][]byte{[]byte("test-id-1"), []byte("test-id-2")}, + SubmittedCount: 2, + }, + } + + server.RecordSubmission(result, 0.5, 2) + + assert.Equal(t, 1, len(server.submissions)) + submission := server.submissions[0] + assert.Equal(t, uint64(100), submission.Height) + assert.Equal(t, uint64(1024), submission.BlobSize) + assert.Equal(t, 0.5, submission.GasPrice) + assert.Equal(t, "Success", submission.StatusCode) + assert.Equal(t, uint64(2), submission.NumBlobs) + assert.Equal(t, 2, len(submission.BlobIDs)) + assert.Equal(t, hex.EncodeToString([]byte("test-id-1")), submission.BlobIDs[0]) + assert.Equal(t, hex.EncodeToString([]byte("test-id-2")), submission.BlobIDs[1]) +} + +func TestRecordSubmissionMemoryLimit(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + // Add 101 submissions (more than the limit of 100) + for i := 0; i < 101; i++ { + result := &coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusSuccess, + Height: uint64(i), + BlobSize: uint64(i * 10), + Timestamp: time.Now(), + }, + } + server.RecordSubmission(result, float64(i)*0.1, 1) + } + + // Should only keep the last 100 submissions + assert.Equal(t, 100, len(server.submissions)) + // First submission should be height 1 (height 0 was dropped) + assert.Equal(t, uint64(1), server.submissions[0].Height) + // Last submission should be height 100 + assert.Equal(t, uint64(100), server.submissions[99].Height) +} + +func TestGetStatusCodeString(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + tests := []struct { + code coreda.StatusCode + expected string + }{ + {coreda.StatusSuccess, "Success"}, + {coreda.StatusNotFound, "Not Found"}, + {coreda.StatusError, "Error"}, + {coreda.StatusTooBig, "Too Big"}, + {coreda.StatusContextDeadline, "Context Deadline"}, + {coreda.StatusUnknown, "Unknown"}, + } + + for _, tt := range tests { + result := server.getStatusCodeString(tt.code) + assert.Equal(t, tt.expected, result) + } +} + +func TestHandleDASubmissions(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + // Add a test submission + result := &coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusSuccess, + Height: 100, + BlobSize: 1024, + Timestamp: time.Now(), + IDs: [][]byte{[]byte("test-id")}, + }, + } + server.RecordSubmission(result, 0.5, 1) + + // Create test request + req, err := http.NewRequest("GET", "/da/submissions", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDASubmissions(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) + + var response map[string]interface{} + err = json.Unmarshal(rr.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Equal(t, float64(1), response["total"]) + submissions, ok := response["submissions"].([]interface{}) + require.True(t, ok) + assert.Equal(t, 1, len(submissions)) + + submission := submissions[0].(map[string]interface{}) + assert.Equal(t, float64(100), submission["height"]) + assert.Equal(t, float64(1024), submission["blob_size"]) + assert.Equal(t, 0.5, submission["gas_price"]) + assert.Equal(t, "Success", submission["status_code"]) +} + +func TestHandleDABlobDetailsMissingID(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + req, err := http.NewRequest("GET", "/da/blob", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDABlobDetails(rr, req) + + assert.Equal(t, http.StatusBadRequest, rr.Code) + assert.Contains(t, rr.Body.String(), "Missing blob ID parameter") +} + +func TestHandleDABlobDetailsInvalidID(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + req, err := http.NewRequest("GET", "/da/blob?id=invalid-hex", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDABlobDetails(rr, req) + + assert.Equal(t, http.StatusBadRequest, rr.Code) + assert.Contains(t, rr.Body.String(), "Invalid blob ID format") +} + +func TestHandleDAVisualizationHTML(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + // Add a test submission + result := &coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusSuccess, + Height: 100, + BlobSize: 1024, + Timestamp: time.Now(), + Message: "Test submission", + }, + } + server.RecordSubmission(result, 0.5, 1) + + req, err := http.NewRequest("GET", "/da", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + server.handleDAVisualizationHTML(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "text/html", rr.Header().Get("Content-Type")) + + body := rr.Body.String() + assert.Contains(t, body, "DA Layer Visualization") + assert.Contains(t, body, "100") // Height + assert.Contains(t, body, "Success") // Status + assert.Contains(t, body, "1024") // Size + assert.Contains(t, body, "Test submission") // Message +} + +func TestGlobalDAVisualizationServer(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + // Initially should be nil + assert.Nil(t, GetDAVisualizationServer()) + + // Set the server + SetDAVisualizationServer(server) + assert.Equal(t, server, GetDAVisualizationServer()) + + // Set to nil + SetDAVisualizationServer(nil) + assert.Nil(t, GetDAVisualizationServer()) +} + +func TestRegisterCustomHTTPEndpointsDAVisualization(t *testing.T) { + da := &mocks.MockDA{} + logger := zerolog.New(nil) + server := NewDAVisualizationServer(da, logger, true) + + // Add test submission + result := &coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusSuccess, + Height: 100, + BlobSize: 1024, + Timestamp: time.Now(), + }, + } + server.RecordSubmission(result, 0.5, 1) + + // Set global server + SetDAVisualizationServer(server) + defer SetDAVisualizationServer(nil) + + // Create mux and register endpoints + mux := http.NewServeMux() + RegisterCustomHTTPEndpoints(mux) + + // Test /da endpoint + req, err := http.NewRequest("GET", "/da", nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + mux.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "text/html", rr.Header().Get("Content-Type")) + + // Test /da/submissions endpoint + req, err = http.NewRequest("GET", "/da/submissions", nil) + require.NoError(t, err) + + rr = httptest.NewRecorder() + mux.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) + + // Test /da/blob endpoint (missing ID should return 400) + req, err = http.NewRequest("GET", "/da/blob", nil) + require.NoError(t, err) + + rr = httptest.NewRecorder() + mux.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusBadRequest, rr.Code) +} + +func TestRegisterCustomHTTPEndpointsWithoutServer(t *testing.T) { + // Ensure no server is set + SetDAVisualizationServer(nil) + + mux := http.NewServeMux() + RegisterCustomHTTPEndpoints(mux) + + // Test that endpoints return service unavailable when server is not set + endpoints := []string{"/da", "/da/submissions", "/da/blob"} + + for _, endpoint := range endpoints { + req, err := http.NewRequest("GET", endpoint, nil) + require.NoError(t, err) + + rr := httptest.NewRecorder() + mux.ServeHTTP(rr, req) + + assert.Equal(t, http.StatusServiceUnavailable, rr.Code) + assert.Contains(t, strings.ToLower(rr.Body.String()), "not available") + } +} diff --git a/pkg/rpc/server/http.go b/pkg/rpc/server/http.go index 911e6913d..b90a738ff 100644 --- a/pkg/rpc/server/http.go +++ b/pkg/rpc/server/http.go @@ -14,6 +14,52 @@ func RegisterCustomHTTPEndpoints(mux *http.ServeMux) { fmt.Fprintln(w, "OK") }) + // DA Visualization endpoints + mux.HandleFunc("/da", func(w http.ResponseWriter, r *http.Request) { + server := GetDAVisualizationServer() + if server == nil { + http.Error(w, "DA visualization not available", http.StatusServiceUnavailable) + return + } + server.handleDAVisualizationHTML(w, r) + }) + + mux.HandleFunc("/da/submissions", func(w http.ResponseWriter, r *http.Request) { + server := GetDAVisualizationServer() + if server == nil { + http.Error(w, "DA visualization not available", http.StatusServiceUnavailable) + return + } + server.handleDASubmissions(w, r) + }) + + mux.HandleFunc("/da/blob", func(w http.ResponseWriter, r *http.Request) { + server := GetDAVisualizationServer() + if server == nil { + http.Error(w, "DA visualization not available", http.StatusServiceUnavailable) + return + } + server.handleDABlobDetails(w, r) + }) + + mux.HandleFunc("/da/stats", func(w http.ResponseWriter, r *http.Request) { + server := GetDAVisualizationServer() + if server == nil { + http.Error(w, "DA visualization not available", http.StatusServiceUnavailable) + return + } + server.handleDAStats(w, r) + }) + + mux.HandleFunc("/da/health", func(w http.ResponseWriter, r *http.Request) { + server := GetDAVisualizationServer() + if server == nil { + http.Error(w, "DA visualization not available", http.StatusServiceUnavailable) + return + } + server.handleDAHealth(w, r) + }) + // Example for adding more custom endpoints: // mux.HandleFunc("/custom/myendpoint", func(w http.ResponseWriter, r *http.Request) { // // Your handler logic here diff --git a/pkg/rpc/server/templates/da_visualization.html b/pkg/rpc/server/templates/da_visualization.html new file mode 100644 index 000000000..d2167ff67 --- /dev/null +++ b/pkg/rpc/server/templates/da_visualization.html @@ -0,0 +1,219 @@ + + + +
---|
Timestamp | +Height | +Status | +Blobs | +Size (bytes) | +Gas Price | +Message | +
---|---|---|---|---|---|---|
{{if not .Timestamp.IsZero}}{{.Timestamp.Format "15:04:05"}}{{else}}--:--:--{{end}} | +{{.Height}} | +{{.StatusCode}} | +
+ {{.NumBlobs}}
+ {{if .BlobIDs}}
+
+ {{range .BlobIDs}}
+ {{slice . 0 8}}...
+ {{end}}
+
+ {{end}}
+ |
+ {{.BlobSize}} | +{{printf "%.6f" .GasPrice}} | +{{.Message}} | +
No submissions recorded yet. This aggregator node has not submitted any data to the DA layer yet.
+ {{end}} + {{else}} +This is a non-aggregator node. Non-aggregator nodes do not submit data to the DA layer and therefore do not have submission statistics, health metrics, or DA-related API endpoints available.
+Only aggregator nodes that actively produce blocks and submit data to the DA layer will display this information.
+ {{end}} + +Auto-refresh: 30s | Refresh Now
+