diff --git a/node/node_identities_test.go b/node/node_identities_test.go index adb9381812..c9e317d02b 100644 --- a/node/node_identities_test.go +++ b/node/node_identities_test.go @@ -29,7 +29,6 @@ func setupAppWithKeys(tb testing.TB, data ...[]byte) (*App, *observer.ObservedLo ))) cfg := getTestConfig(tb) app := New(WithLog(log.NewFromLog(logger)), WithConfig(&cfg)) - app.Config.DataDirParent = tb.TempDir() if len(data) == 0 { return app, observedLogs } diff --git a/sql/layers/layers.go b/sql/layers/layers.go index 56097430ce..f229db599a 100644 --- a/sql/layers/layers.go +++ b/sql/layers/layers.go @@ -385,6 +385,10 @@ func Get( stmt.ColumnBytes(4, layer.StateHash[:]) stmt.ColumnBytes(5, layer.AggregatedHash[:]) + if layer.AppliedBlock.IsEmpty() { + return true + } + inner := types.InnerBlock{} _, err := codec.DecodeFrom(stmt.ColumnReader(7), &inner) if err != nil { diff --git a/systest/tests/checkpoint_test.go b/systest/tests/checkpoint_test.go index a220ad2325..9c8e20771a 100644 --- a/systest/tests/checkpoint_test.go +++ b/systest/tests/checkpoint_test.go @@ -69,7 +69,14 @@ func TestCheckpoint(t *testing.T) { ctx, cancel := context.WithDeadline(tctx, deadline) defer cancel() require.NoError(t, sendTransactions(ctx, tctx.Log.Desugar(), cl, first, stop, receiver, 1, 100)) - require.NoError(t, waitLayer(tctx, cl.Client(0), snapshotLayer)) + + layerTime := cl.Genesis().Add(time.Duration(snapshotLayer) * layerDuration) + tctx.Log.Debugw("waiting for layer", "layer", snapshotLayer, "layer time", layerTime) + select { + case <-tctx.Done(): + require.FailNow(t, "test context is done") + case <-time.After(time.Until(layerTime)): + } tctx.Log.Debugw("getting account balances") before, err := getBalance(tctx, cl, snapshotLayer) diff --git a/systest/tests/common.go b/systest/tests/common.go index 86f8a4e04a..2bc0d6448d 100644 --- a/systest/tests/common.go +++ b/systest/tests/common.go @@ -44,13 +44,16 @@ func sendTransactions( client := cl.Client(i % cl.Total()) nonce, err := getNonce(ctx, client, cl.Address(i)) if err != nil { - return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i), err) + return fmt.Errorf("get nonce failed (%s: %s): %w", client.Name, cl.Address(i).String(), err) } - watchLayers(ctx, eg, client, logger, func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number >= stop { + watchLayers(ctx, eg, client, logger, func(layer *pb2.Layer) (bool, error) { + if layer.Number < first { + return true, nil + } + if layer.Number >= stop { return false, nil } - if layer.Layer.Status != pb.Layer_LAYER_STATUS_APPROVED || layer.Layer.Number.Number < first { + if layer.Status != pb2.Layer_LAYER_STATUS_APPLIED { return true, nil } // give some time for a previous layer to be applied @@ -62,14 +65,19 @@ func sendTransactions( zap.String("client", client.Name), zap.Stringer("address", cl.Address(i)), ) - if err := submitSpawn(ctx, cl, i, client); err != nil { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if _, err := submitTransaction(ctx, + wallet.SelfSpawn(cl.Private(i), 0, sdk.WithGenesisID(cl.GenesisID())), + client, + ); err != nil { return false, fmt.Errorf("failed to spawn %w", err) } nonce++ return true, nil } logger.Debug("submitting transactions", - zap.Uint32("layer", layer.Layer.Number.Number), + zap.Uint32("layer", layer.Number), zap.String("client", client.Name), zap.Stringer("address", cl.Address(i)), zap.Uint64("nonce", nonce), @@ -102,67 +110,17 @@ func sendTransactions( } func submitTransaction(ctx context.Context, tx []byte, node *cluster.NodeClient) ([]byte, error) { - txclient := pb.NewTransactionServiceClient(node.PubConn()) + client := pb2.NewTransactionServiceClient(node.PubConn()) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - response, err := txclient.SubmitTransaction(ctx, &pb.SubmitTransactionRequest{Transaction: tx}) + resp, err := client.SubmitTransaction(ctx, &pb2.SubmitTransactionRequest{Transaction: tx}) if err != nil { return nil, err } - if response.Txstate == nil { - return nil, errors.New("tx state should not be nil") - } - return response.Txstate.Id.Id, nil -} - -func stateHashStream( - ctx context.Context, - node *cluster.NodeClient, - logger *zap.Logger, - collector func(*pb.GlobalStateStreamResponse) (bool, error), -) error { - retries := 0 -BACKOFF: - stateapi := pb.NewGlobalStateServiceClient(node.PubConn()) - states, err := stateapi.GlobalStateStream(ctx, &pb.GlobalStateStreamRequest{ - GlobalStateDataFlags: uint32(pb.GlobalStateDataFlag_GLOBAL_STATE_DATA_FLAG_GLOBAL_STATE_HASH), - }) - if err != nil { - return err - } - for { - state, err := states.Recv() - s, ok := status.FromError(err) - if !ok { - if ctx.Err() != nil { - return ctx.Err() - } - return fmt.Errorf("unknown error: %w", err) - } - switch s.Code() { - case codes.OK: - if cont, err := collector(state); !cont { - return err - } - case codes.Canceled, codes.DeadlineExceeded: - return nil - case codes.Unavailable: - if retries == attempts { - return errors.New("state stream unavailable") - } - retries++ - time.Sleep(retryBackoff) - goto BACKOFF - default: - logger.Warn( - "global state stream error", - zap.String("client", node.Name), - zap.Error(err), - zap.Any("status", s), - ) - return fmt.Errorf("stream err from client %v: %w", node.Name, err) - } + if resp.TxId == nil { + return nil, errors.New("tx id should not be nil") } + return resp.TxId, nil } func watchLayers( @@ -170,7 +128,7 @@ func watchLayers( eg *errgroup.Group, node *cluster.NodeClient, logger *zap.Logger, - collector func(*pb.LayerStreamResponse) (bool, error), + collector func(*pb2.Layer) (bool, error), ) { eg.Go(func() error { return layersStream(ctx, node, logger, collector) @@ -181,17 +139,20 @@ func layersStream( ctx context.Context, node *cluster.NodeClient, logger *zap.Logger, - collector func(*pb.LayerStreamResponse) (bool, error), + collector func(*pb2.Layer) (bool, error), ) error { retries := 0 BACKOFF: - meshapi := pb.NewMeshServiceClient(node.PubConn()) - layers, err := meshapi.LayerStream(ctx, &pb.LayerStreamRequest{}) + client := pb2.NewLayerStreamServiceClient(node.PrivConn()) + stream, err := client.Stream(ctx, &pb2.LayerStreamRequest{ + Watch: true, + }) if err != nil { - return err + return fmt.Errorf("streaming layers for %s: %w", node.Name, err) } + defer stream.CloseSend() for { - layer, err := layers.Recv() + layer, err := stream.Recv() s, ok := status.FromError(err) if !ok { if ctx.Err() != nil { @@ -233,10 +194,12 @@ func malfeasanceStream( ) error { retries := 0 BACKOFF: - malapi := pb2.NewMalfeasanceStreamServiceClient(node.PrivConn()) - proofs, err := malapi.Stream(ctx, &pb2.MalfeasanceStreamRequest{Watch: true}) + client := pb2.NewMalfeasanceStreamServiceClient(node.PrivConn()) + proofs, err := client.Stream(ctx, &pb2.MalfeasanceStreamRequest{ + Watch: true, + }) if err != nil { - return err + return fmt.Errorf("streaming malfeasance for %s: %w", node.Name, err) } defer proofs.CloseSend() for { @@ -275,58 +238,20 @@ BACKOFF: } } -func waitGenesis(ctx *testcontext.Context, node *cluster.NodeClient) error { - svc := pb.NewMeshServiceClient(node.PubConn()) - resp, err := svc.GenesisTime(ctx, &pb.GenesisTimeRequest{}) - if err != nil { - return err - } - genesis := time.Unix(int64(resp.Unixtime.Value), 0) - now := time.Now() - if !genesis.After(now) { - return nil - } - ctx.Log.Debugw("waiting for genesis", "now", now, "genesis", genesis) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(genesis.Sub(now)): - return nil - } -} - -func waitLayer(ctx *testcontext.Context, node *cluster.NodeClient, lid uint32) error { - svc := pb.NewMeshServiceClient(node.PubConn()) - resp, err := svc.GenesisTime(ctx, &pb.GenesisTimeRequest{}) - if err != nil { - return err - } - lyrTime := time.Unix(int64(resp.Unixtime.Value), 0). - Add(time.Duration(lid) * testcontext.LayerDuration.Get(ctx.Parameters)) - - now := time.Now() - if !lyrTime.After(now) { - return nil - } - ctx.Log.Debugw("waiting for layer", "now", now, "layer time", lyrTime, "layer", lid) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(lyrTime.Sub(now)): - return nil - } -} - -func waitTransaction(ctx context.Context, eg *errgroup.Group, client *cluster.NodeClient, id []byte) { +func waitTransaction(ctx context.Context, eg *errgroup.Group, node *cluster.NodeClient, id []byte) { eg.Go(func() error { - api := pb.NewTransactionServiceClient(client.PubConn()) - rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true, Id: id}) + client := pb2.NewTransactionStreamServiceClient(node.PrivConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Watch: true, + Txid: [][]byte{id}, + }) if err != nil { return err } - _, err = rsts.Recv() + defer stream.CloseSend() + _, err = stream.Recv() if err != nil { - return fmt.Errorf("stream error on receiving result %s: %w", client.Name, err) + return fmt.Errorf("stream error on receiving result %s: %w", node.Name, err) } return nil }) @@ -334,19 +259,22 @@ func waitTransaction(ctx context.Context, eg *errgroup.Group, client *cluster.No func watchTransactionResults( ctx context.Context, - client *cluster.NodeClient, + node *cluster.NodeClient, log *zap.Logger, - collector func(*pb.TransactionResult) (bool, error), + collector func(*pb2.TransactionResponse) (bool, error), ) error { retries := 0 BACKOFF: - api := pb.NewTransactionServiceClient(client.PubConn()) - rsts, err := api.StreamResults(ctx, &pb.TransactionResultsRequest{Watch: true}) + client := pb2.NewTransactionStreamServiceClient(node.PrivConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Watch: true, + }) if err != nil { - return err + return fmt.Errorf("streaming transactions for %s: %w", node.Name, err) } + defer stream.CloseSend() for { - rst, err := rsts.Recv() + rst, err := stream.Recv() s, ok := status.FromError(err) if !ok { if ctx.Err() != nil { @@ -371,11 +299,11 @@ BACKOFF: default: log.Warn( "transactions stream error", - zap.String("client", client.Name), + zap.String("client", node.Name), zap.Error(err), zap.Any("status", s), ) - return fmt.Errorf("stream error on receiving result %s: %w", client.Name, err) + return fmt.Errorf("stream error on receiving result %s: %w", node.Name, err) } } } @@ -383,20 +311,21 @@ BACKOFF: func watchProposals( ctx context.Context, eg *errgroup.Group, - client *cluster.NodeClient, + node *cluster.NodeClient, log *zap.Logger, collector func(*pb.Proposal) (bool, error), ) { eg.Go(func() error { retries := 0 BACKOFF: - dbg := pb.NewDebugServiceClient(client.PrivConn()) - proposals, err := dbg.ProposalsStream(ctx, &emptypb.Empty{}) + client := pb.NewDebugServiceClient(node.PrivConn()) + stream, err := client.ProposalsStream(ctx, &emptypb.Empty{}) if err != nil { - return fmt.Errorf("proposal stream for %s: %w", client.Name, err) + return fmt.Errorf("streaming proposals for %s: %w", node.Name, err) } + defer stream.CloseSend() for { - proposal, err := proposals.Recv() + proposal, err := stream.Recv() s, ok := status.FromError(err) if !ok { if ctx.Err() != nil { @@ -421,11 +350,11 @@ func watchProposals( default: log.Warn( "proposals stream error", - zap.String("client", client.Name), + zap.String("client", node.Name), zap.Error(err), zap.Any("status", s), ) - return fmt.Errorf("proposal event for %s: %w", client.Name, err) + return fmt.Errorf("proposal event for %s: %w", node.Name, err) } } }) @@ -444,15 +373,17 @@ func scheduleChaos( action func(context.Context) (chaos.Teardown, error), ) { var teardown chaos.Teardown - watchLayers(ctx, eg, client, logger, func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number == from && teardown == nil { + watchLayers(ctx, eg, client, logger, func(layer *pb2.Layer) (bool, error) { + switch { + case layer.Number < from: + return true, nil + case layer.Number == from && teardown == nil: var err error teardown, err = action(ctx) if err != nil { return false, err } - } - if layer.Layer.Number.Number == to { + case layer.Number >= to: if err := teardown(ctx); err != nil { return false, err } @@ -464,9 +395,9 @@ func scheduleChaos( func currentLayer(ctx context.Context, tb testing.TB, client *cluster.NodeClient) uint32 { tb.Helper() - response, err := pb.NewMeshServiceClient(client.PubConn()).CurrentLayer(ctx, &pb.CurrentLayerRequest{}) + resp, err := pb2.NewNodeServiceClient(client.PubConn()).Status(ctx, &pb2.NodeStatusRequest{}) require.NoError(tb, err) - return response.Layernum.Number + return resp.CurrentLayer } func waitAll(tctx *testcontext.Context, cl *cluster.Cluster) error { @@ -486,32 +417,25 @@ func nextFirstLayer(current, size uint32) uint32 { return current } -func getNonce(ctx context.Context, client *cluster.NodeClient, address types.Address) (uint64, error) { - gstate := pb.NewGlobalStateServiceClient(client.PubConn()) - resp, err := gstate.Account(ctx, &pb.AccountRequest{AccountId: &pb.AccountId{Address: address.String()}}) +func getNonce(ctx context.Context, node *cluster.NodeClient, address types.Address) (uint64, error) { + resp, err := pb2.NewAccountServiceClient(node.PubConn()).List(ctx, &pb2.AccountRequest{ + Addresses: []string{address.String()}, + Limit: 1, + }) if err != nil { return 0, err } - return resp.AccountWrapper.StateProjected.Counter, nil + return resp.Accounts[0].Current.Counter, nil } -func currentBalance(ctx context.Context, client *cluster.NodeClient, address types.Address) (uint64, error) { - gstate := pb.NewGlobalStateServiceClient(client.PubConn()) - resp, err := gstate.Account(ctx, &pb.AccountRequest{AccountId: &pb.AccountId{Address: address.String()}}) +func currentBalance(ctx context.Context, node *cluster.NodeClient, address types.Address) (uint64, error) { + resp, err := pb2.NewAccountServiceClient(node.PubConn()).List(ctx, &pb2.AccountRequest{ + Addresses: []string{address.String()}, + }) if err != nil { return 0, err } - return resp.AccountWrapper.StateCurrent.Balance.Value, nil -} - -func submitSpawn(ctx context.Context, cluster *cluster.Cluster, account int, client *cluster.NodeClient) error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - _, err := submitTransaction(ctx, - wallet.SelfSpawn(cluster.Private(account), 0, sdk.WithGenesisID(cluster.GenesisID())), - client, - ) - return err + return resp.Accounts[0].Current.Balance, nil } func submitSpend( @@ -543,38 +467,37 @@ func syncedNodes(ctx context.Context, cl *cluster.Cluster) []*cluster.NodeClient func isSynced(ctx context.Context, node *cluster.NodeClient) bool { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - svc := pb.NewNodeServiceClient(node.PubConn()) - resp, err := svc.Status(ctx, &pb.StatusRequest{}) + svc := pb2.NewNodeServiceClient(node.PubConn()) + resp, err := svc.Status(ctx, &pb2.NodeStatusRequest{}) if err != nil { return false } - return resp.Status.IsSynced + return resp.Status == pb2.NodeStatusResponse_SYNC_STATUS_SYNCED } -func getLayer(ctx context.Context, node *cluster.NodeClient, lid uint32) (*pb.Layer, error) { +func getLayer(ctx context.Context, node *cluster.NodeClient, lid uint32) (*pb2.Layer, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - layer := &pb.LayerNumber{Number: lid} - msvc := pb.NewMeshServiceClient(node.PubConn()) - lresp, err := msvc.LayersQuery(ctx, &pb.LayersQueryRequest{StartLayer: layer, EndLayer: layer}) + client := pb2.NewLayerServiceClient(node.PubConn()) + resp, err := client.List(ctx, &pb2.LayerRequest{StartLayer: lid, EndLayer: lid}) if err != nil { return nil, err } - if len(lresp.Layer) != 1 { - return nil, fmt.Errorf("request was made for one layer (%d)", layer.Number) + if len(resp.Layers) != 1 { + return nil, fmt.Errorf("request was made for one layer (%d)", lid) } - return lresp.Layer[0], nil + return resp.Layers[0], nil } -func getVerifiedLayer(ctx context.Context, node *cluster.NodeClient) (*pb.Layer, error) { +func getVerifiedLayer(ctx context.Context, node *cluster.NodeClient) (*pb2.Layer, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - svc := pb.NewNodeServiceClient(node.PubConn()) - resp, err := svc.Status(ctx, &pb.StatusRequest{}) + client := pb2.NewNodeServiceClient(node.PubConn()) + resp, err := client.Status(ctx, &pb2.NodeStatusRequest{}) if err != nil { return nil, err } - return getLayer(ctx, node, resp.Status.VerifiedLayer.Number) + return getLayer(ctx, node, resp.AppliedLayer) } type txClient struct { @@ -606,20 +529,21 @@ type txRequest struct { node *cluster.NodeClient txid []byte - rst *pb.TransactionResult + rst *pb2.TransactionResponse } func (r *txRequest) wait(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - client := pb.NewTransactionServiceClient(r.node.PubConn()) - stream, err := client.StreamResults(ctx, &pb.TransactionResultsRequest{ - Id: r.txid, + client := pb2.NewTransactionStreamServiceClient(r.node.PrivConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Txid: [][]byte{r.txid}, Watch: true, }) if err != nil { return err } + defer stream.CloseSend() rst, err := stream.Recv() if err != nil { return err @@ -628,17 +552,18 @@ func (r *txRequest) wait(ctx context.Context) error { return nil } -func (r *txRequest) result(ctx context.Context) (*pb.TransactionResult, error) { +func (r *txRequest) result(ctx context.Context) (*pb2.TransactionResponse, error) { if r.rst != nil { return r.rst, nil } - client := pb.NewTransactionServiceClient(r.node.PubConn()) - stream, err := client.StreamResults(ctx, &pb.TransactionResultsRequest{ - Id: r.txid, + client := pb2.NewTransactionStreamServiceClient(r.node.PrivConn()) + stream, err := client.Stream(ctx, &pb2.TransactionStreamRequest{ + Txid: [][]byte{r.txid}, }) if err != nil { return nil, err } + defer stream.CloseSend() rst, err := stream.Recv() if err != nil { // eof without result - transaction wasn't applied yet diff --git a/systest/tests/distributed_post_verification_test.go b/systest/tests/distributed_post_verification_test.go index 3ee8adf6f7..b0ec02f87f 100644 --- a/systest/tests/distributed_post_verification_test.go +++ b/systest/tests/distributed_post_verification_test.go @@ -12,8 +12,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" - pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/spacemeshos/go-scale" "github.com/spacemeshos/post/shared" "github.com/spacemeshos/post/verifying" @@ -365,7 +364,7 @@ func testPostMalfeasance( var ( atx builtAtx - expectedDomain pb2.MalfeasanceProof_MalfeasanceDomain + expectedDomain pb.MalfeasanceProof_MalfeasanceDomain expectedType uint32 ) expectedProperties := make(map[string]string) @@ -382,7 +381,7 @@ func testPostMalfeasance( } watx.Sign(signer) atx = watx - expectedDomain = pb2.MalfeasanceProof_DOMAIN_UNSPECIFIED + expectedDomain = pb.MalfeasanceProof_DOMAIN_UNSPECIFIED expectedType = 4 expectedProperties["atx"] = atx.ID().String() case types.AtxV2: @@ -413,7 +412,7 @@ func testPostMalfeasance( } watx.Sign(signer) atx = watx - expectedDomain = pb2.MalfeasanceProof_DOMAIN_ACTIVATION + expectedDomain = pb.MalfeasanceProof_DOMAIN_ACTIVATION expectedType = 0 expectedProperties["type"] = "InvalidPoSTProof" expectedProperties["atx"] = atx.ID().String() @@ -428,9 +427,9 @@ func testPostMalfeasance( zap.Uint32("epoch", publishEpoch.Uint32()), zap.Uint32("layer", publishEpoch.FirstLayer().Uint32()), ) - err = layersStream(ctx, cl.Client(0), logger, func(resp *pb.LayerStreamResponse) (bool, error) { - logger.Info("new layer", zap.Uint32("layer", resp.Layer.Number.Number)) - return resp.Layer.Number.Number < publishEpoch.FirstLayer().Uint32(), nil + err = layersStream(ctx, cl.Client(0), logger, func(resp *pb.Layer) (bool, error) { + logger.Info("new layer", zap.Uint32("layer", resp.Number)) + return resp.Number < publishEpoch.FirstLayer().Uint32(), nil }) require.NoError(t, err) @@ -458,7 +457,7 @@ func testPostMalfeasance( // 5. Wait for POST malfeasance proof receivedProof := false logger.Info("waiting for malfeasance proof", zap.Duration("timeout", timeout)) - err = malfeasanceStream(publishCtx, cl.Client(0), logger, func(proof *pb2.MalfeasanceProof) (bool, error) { + err = malfeasanceStream(publishCtx, cl.Client(0), logger, func(proof *pb.MalfeasanceProof) (bool, error) { if !bytes.Equal(proof.GetSmesher(), signer.NodeID().Bytes()) { return true, nil } diff --git a/systest/tests/equivocation_test.go b/systest/tests/equivocation_test.go index c3df39bfa3..43f3f62ffe 100644 --- a/systest/tests/equivocation_test.go +++ b/systest/tests/equivocation_test.go @@ -7,8 +7,7 @@ import ( "time" "github.com/oasisprotocol/curve25519-voi/primitives/ed25519" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" - pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -89,15 +88,18 @@ func TestEquivocation(t *testing.T) { for i := 0; i < cl.Total(); i++ { client := cl.Client(i) results[client.Name] = make(map[int]string) - watchLayers(cctx, &eg, client, cctx.Log.Desugar(), func(resp *pb.LayerStreamResponse) (bool, error) { - if resp.Layer.Status != pb.Layer_LAYER_STATUS_APPLIED { + watchLayers(cctx, &eg, client, cctx.Log.Desugar(), func(resp *pb.Layer) (bool, error) { + if resp.Number < startTest { return true, nil } - if resp.Layer.Number.Number > stopTest { + if resp.Status != pb.Layer_LAYER_STATUS_VERIFIED { + return true, nil + } + if resp.Number > stopTest { return false, nil } - num := int(resp.Layer.Number.Number) - consensus := types.BytesToHash(resp.Layer.Hash).ShortString() + num := int(resp.Number) + consensus := types.BytesToHash(resp.StateHash).ShortString() cctx.Log.Debugw("consensus hash collected", "client", client.Name, "layer", num, @@ -126,7 +128,7 @@ func TestEquivocation(t *testing.T) { proofs := make([]types.NodeID, 0, len(malfeasants)) ctx, cancel := context.WithTimeout(t.Context(), 20*time.Second) defer cancel() - malfeasanceStream(ctx, client, cctx.Log.Desugar(), func(proof *pb2.MalfeasanceProof) (bool, error) { + malfeasanceStream(ctx, client, cctx.Log.Desugar(), func(proof *pb.MalfeasanceProof) (bool, error) { malfeasant := proof.GetSmesher() proofs = append(proofs, types.BytesToNodeID(malfeasant)) return len(proofs) < len(malfeasants), nil diff --git a/systest/tests/nodes_test.go b/systest/tests/nodes_test.go index 63f7f7ac2f..0536d6fc8b 100644 --- a/systest/tests/nodes_test.go +++ b/systest/tests/nodes_test.go @@ -6,6 +6,7 @@ import ( "testing" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -38,11 +39,11 @@ func TestAddNodes(t *testing.T) { tctx.Log.Info("cluster size changed to ", tctx.ClusterSize) var eg errgroup.Group - watchLayers(tctx, &eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number >= beforeAdding { + watchLayers(tctx, &eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { + if layer.Number >= beforeAdding { tctx.Log.Debugw("adding new smeshers", "n", addedLater, - "layer", layer.Layer.Number, + "layer", layer.Number, ) // the new smeshers will use the old sync protocol return false, cl.AddSmeshers(tctx, addedLater) @@ -136,22 +137,22 @@ func TestFailedNodes(t *testing.T) { } for i := range cl.Total() - failed { client := cl.Client(i) - watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Status == pb.Layer_LAYER_STATUS_APPLIED { + watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { + if layer.Status == pb2.Layer_LAYER_STATUS_VERIFIED { tctx.Log.Debugw( "layer applied", "client", client.Name, "layer", - layer.Layer.Number.Number, + layer.Number, "hash", - prettyHex(layer.Layer.Hash), + prettyHex(layer.StateHash), ) - if layer.Layer.Number.Number == stopLayer { - return false, nil + if layer.Number <= lastLayer { + hashes[i][layer.Number] = prettyHex(layer.StateHash) } - if layer.Layer.Number.Number <= lastLayer { - hashes[i][layer.Layer.Number.Number] = prettyHex(layer.Layer.Hash) + if layer.Number >= stopLayer { + return false, nil } } return true, nil diff --git a/systest/tests/partition_test.go b/systest/tests/partition_test.go index 06e393e0e5..4aa317f207 100644 --- a/systest/tests/partition_test.go +++ b/systest/tests/partition_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -84,31 +84,29 @@ func testPartition(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster numLayers := stop - types.GetEffectiveGenesis().Uint32() // assuming each client can update state for the same layer up to 10 times stateCh := make(chan *stateUpdate, uint32(cl.Total())*numLayers*10) - tctx.Log.Debug("listening to state hashes...") + tctx.Log.Debug("listening to layers for state hashes...") for i := range cl.Total() { node := cl.Client(i) eg.Go(func() error { - err := stateHashStream(ctx, node, tctx.Log.Desugar(), - func(state *pb.GlobalStateStreamResponse) (bool, error) { - data := state.Datum.Datum - require.IsType(tb, &pb.GlobalStateData_GlobalState{}, data) - - resp := data.(*pb.GlobalStateData_GlobalState) - layer := resp.GlobalState.Layer.Number - if layer > stop { + err := layersStream(ctx, node, tctx.Log.Desugar(), + func(layer *pb.Layer) (bool, error) { + if layer.Number < first { + return true, nil + } + if layer.Number > stop { return false, nil } - stateHash := types.BytesToHash(resp.GlobalState.RootHash) + stateHash := types.BytesToHash(layer.StateHash) tctx.Log.Debugw("state hash collected", "client", node.Name, - "layer", layer, + "layer", layer.Number, "state", stateHash.ShortString(), ) select { case stateCh <- &stateUpdate{ - layer: layer, + layer: layer.Number, hash: stateHash, client: node.Name, }: // continue @@ -126,7 +124,7 @@ func testPartition(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster }, ) if err != nil { - return fmt.Errorf("state hash stream error for %s: %w", node.Name, err) + return fmt.Errorf("layer stream error for %s: %w", node.Name, err) } return nil }) diff --git a/systest/tests/poets_test.go b/systest/tests/poets_test.go index 044b559946..c9fbebf370 100644 --- a/systest/tests/poets_test.go +++ b/systest/tests/poets_test.go @@ -8,6 +8,7 @@ import ( "testing" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -71,25 +72,27 @@ func TestPoetsFailures(t *testing.T) { }) } - watchLayers(ctx, eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { + watchLayers(ctx, eg, cl.Client(0), tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { // Will kill a poet from time to time - if layer.Layer.Number.Number > last { + if layer.Number < first { + return true, nil + } + if layer.Number > last { tctx.Log.Debug("Poet killer is done") return false, nil } - if layer.Layer.GetStatus() != pb.Layer_LAYER_STATUS_APPLIED { + if layer.Status != pb2.Layer_LAYER_STATUS_VERIFIED { return true, nil } // don't kill a poet if this is not ~middle of epoch - if ((layer.Layer.GetNumber().GetNumber() + layersPerEpoch/2) % layersPerEpoch) != 0 { + if ((layer.Number + layersPerEpoch/2) % layersPerEpoch) != 0 { return true, nil } poetToDelete := cl.Poet(0) - tctx.Log.Debugw("deleting poet pod", "poet", poetToDelete.Name, "layer", layer.Layer.GetNumber().GetNumber()) + tctx.Log.Debugw("deleting poet pod", "poet", poetToDelete.Name, "layer", layer.Number) require.NoError(t, cl.DeletePoet(tctx, 0)) require.NoError(t, cl.AddPoet(tctx)) - return true, nil }) diff --git a/systest/tests/smeshing_test.go b/systest/tests/smeshing_test.go index 2d13a3f494..860bc021b7 100644 --- a/systest/tests/smeshing_test.go +++ b/systest/tests/smeshing_test.go @@ -12,6 +12,7 @@ import ( "github.com/oasisprotocol/curve25519-voi/primitives/ed25519" pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb2 "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -211,8 +212,8 @@ func testVesting(tb testing.TB, tctx *testcontext.Context, cl *cluster.Cluster, client := cl.Client(i % cl.Total()) eg.Go(func() error { var subeg errgroup.Group - watchLayers(tctx, &subeg, client, tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - return layer.Layer.Number.Number < uint32(acc.start), nil + watchLayers(tctx, &subeg, client, tctx.Log.Desugar(), func(layer *pb2.Layer) (bool, error) { + return layer.Number < uint32(acc.start), nil }) if err := subeg.Wait(); err != nil { return err diff --git a/systest/tests/steps_test.go b/systest/tests/steps_test.go index 5d284ee264..85399f883a 100644 --- a/systest/tests/steps_test.go +++ b/systest/tests/steps_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -100,7 +100,14 @@ func TestStepTransactions(t *testing.T) { tctx := testcontext.New(t, testcontext.SkipClusterLimits()) cl, err := cluster.Reuse(tctx, cluster.WithKeys(tctx.ClusterSize)) require.NoError(t, err) - require.NoError(t, waitGenesis(tctx, cl.Client(0))) + + tctx.Log.Debugw("waiting for genesis", "genesis time", cl.Genesis()) + select { + case <-tctx.Done(): + require.FailNow(t, "context canceled") + case <-time.After(time.Until(cl.Genesis())): // wait for genesis + } + t.Cleanup(cl.CloseClients) clients := make([]*txClient, cl.Accounts()) @@ -142,8 +149,7 @@ func TestStepTransactions(t *testing.T) { if err != nil { return err } - - tctx.Log.Debugw("spawned wallet", "address", client.account, "layer", rst.Layer) + tctx.Log.Debugw("spawned wallet", "address", client.account, "layer", rst.TxResult.Layer) } tctx.Log.Debugw("submitting transactions", "address", client.account, @@ -223,9 +229,9 @@ func TestStepVerifyConsistency(t *testing.T) { require.NoError(t, err) cctx.Log.Debugw("using verified layer as a reference", "node", synced[0].Name, - "layer", reference.Number.Number, - "hash", prettyHex(reference.Hash), - "state hash", prettyHex(reference.RootStateHash), + "layer", reference.Number, + "hash", prettyHex(reference.StateHash), + "state hash", prettyHex(reference.CumulativeStateHash), ) layers := make([]*pb.Layer, len(synced)) @@ -235,22 +241,22 @@ func TestStepVerifyConsistency(t *testing.T) { var eg errgroup.Group for i, node := range synced[1:] { eg.Go(func() error { - layer, err := getLayer(cctx, node, reference.Number.Number) + layer, err := getLayer(cctx, node, reference.Number) if err != nil { return err } layers[i] = layer - if !bytes.Equal(layer.Hash, reference.Hash) { + if !bytes.Equal(layer.StateHash, reference.StateHash) { return fmt.Errorf("hash doesn't match reference %s in layer %d: %x != %x", - node.Name, reference.Number.Number, layer.Hash, reference.Hash) + node.Name, reference.Number, layer.StateHash, reference.StateHash) } - if !bytes.Equal(layer.RootStateHash, reference.RootStateHash) { + if !bytes.Equal(layer.CumulativeStateHash, reference.CumulativeStateHash) { return fmt.Errorf( "state hash doesn't match reference %s in layer %d: %x != %x", node.Name, - reference.Number.Number, - layer.RootStateHash, - reference.RootStateHash, + reference.Number, + layer.CumulativeStateHash, + reference.CumulativeStateHash, ) } return nil @@ -266,12 +272,12 @@ func TestStepVerifyConsistency(t *testing.T) { if i == 0 { continue } - require.NotNil(t, layer, "client %s doesn't have layer %d", - synced[i].Name, reference.Number) - require.Equal(t, reference.Hash, layer.Hash, "consensus hash on client %s", - synced[i].Name) - require.Equal(t, reference.RootStateHash, layer.RootStateHash, "state hash on client %s", - synced[i].Name) + require.NotNil(t, layer, "client %s doesn't have layer %d", synced[i].Name, reference.Number) + require.Equal(t, reference.ConsensusHash, layer.ConsensusHash, "consensus hash on client %s", synced[i].Name) + require.Equal(t, + reference.CumulativeStateHash, layer.CumulativeStateHash, + "state hash on client %s", synced[i].Name, + ) } } diff --git a/systest/tests/timeskew_test.go b/systest/tests/timeskew_test.go index f47b8e9047..ece3de7367 100644 --- a/systest/tests/timeskew_test.go +++ b/systest/tests/timeskew_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -70,15 +70,18 @@ func TestShortTimeSkew(t *testing.T) { // abstain on one or two layers. in such case longer delay might be necessary to confirm that layer var confirmed uint32 - watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb.LayerStreamResponse) (bool, error) { - if layer.Layer.Number.Number >= stopTest { + watchLayers(ctx, eg, client, tctx.Log.Desugar(), func(layer *pb.Layer) (bool, error) { + if layer.Number < enableSkew { + return true, nil + } + if layer.Number >= stopTest { return false, nil } - if layer.Layer.Status == pb.Layer_LAYER_STATUS_APPLIED { + if layer.Status == pb.Layer_LAYER_STATUS_VERIFIED { tctx.Log.Debugw( - "layer applied", "layer", layer.Layer.Number.Number, "hash", prettyHex(layer.Layer.Hash), + "layer applied", "layer", layer.Number, "hash", prettyHex(layer.StateHash), ) - confirmed = layer.Layer.Number.Number + confirmed = layer.Number if confirmed >= stopSkew { return false, nil } diff --git a/systest/tests/transactions_test.go b/systest/tests/transactions_test.go index d8d2bece6d..003db2a2e5 100644 --- a/systest/tests/transactions_test.go +++ b/systest/tests/transactions_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - pb "github.com/spacemeshos/api/release/go/spacemesh/v1" + pb "github.com/spacemeshos/api/release/go/spacemesh/v2beta1" "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -24,7 +24,7 @@ func testTransactions( sendFor uint32, ) { var ( - // start sending transactions after two layers or after genesis + // start sending transactions now + two layers but not earlier than layer 8 (effective genesis) first = max(currentLayer(tctx, tb, cl.Client(0))+2, 8) stop = first + sendFor batch = 10 @@ -40,13 +40,12 @@ func testTransactions( "expected transactions", expectedCount, ) receiver := types.GenerateAddress([]byte{11, 1, 1}) - state := pb.NewGlobalStateServiceClient(cl.Client(0).PubConn()) - response, err := state.Account( - tctx, - &pb.AccountRequest{AccountId: &pb.AccountId{Address: receiver.String()}}, - ) + state := pb.NewAccountServiceClient(cl.Client(0).PubConn()) + response, err := state.List(tctx, &pb.AccountRequest{ + Addresses: []string{receiver.String()}, + }) require.NoError(tb, err) - before := response.AccountWrapper.StateCurrent.Balance + before := response.Accounts[0].Current.Balance layerDuration := testcontext.LayerDuration.Get(tctx.Parameters) layersPerEpoch := uint32(testcontext.LayersPerEpoch.Get(tctx.Parameters)) @@ -63,11 +62,11 @@ func testTransactions( client := cl.Client(i) eg.Go(func() error { err := watchTransactionResults(ctx, client, tctx.Log.Desugar(), - func(rst *pb.TransactionResult) (bool, error) { + func(rst *pb.TransactionResponse) (bool, error) { txs[i] = append(txs[i], rst.Tx) count := len(txs[i]) - tctx.Log.Desugar().Debug("received transaction client", - zap.Uint32("layer", rst.Layer), + tctx.Log.Desugar().Debug("received transaction", + zap.Uint32("layer", rst.TxResult.Layer), zap.String("client", client.Name), zap.String("tx", "0x"+hex.EncodeToString(rst.Tx.Id)), zap.Int("count", count), @@ -99,20 +98,18 @@ func testTransactions( diff := batch * amount * int(sendFor-1) * cl.Accounts() for i := 0; i < cl.Total(); i++ { client := cl.Client(i) - state := pb.NewGlobalStateServiceClient(client.PubConn()) - response, err := state.Account( - tctx, - &pb.AccountRequest{AccountId: &pb.AccountId{Address: receiver.String()}}, - ) + state := pb.NewAccountServiceClient(client.PubConn()) + response, err := state.List(tctx, &pb.AccountRequest{ + Addresses: []string{receiver.String()}, + }) require.NoError(tb, err) - after := response.AccountWrapper.StateCurrent.Balance + after := response.Accounts[0].Current.Balance tctx.Log.Infow("receiver state", - "before", before.Value, - "after", after.Value, + "before", before, + "after", after, "expected-diff", diff, - "diff", after.Value-before.Value, + "diff", after-before, ) - require.Equal(tb, int(before.Value)+diff, - int(response.AccountWrapper.StateCurrent.Balance.Value), "client=%s", client.Name) + require.Equal(tb, int(before)+diff, int(after), "client=%s", client.Name) } }