@@ -747,8 +747,8 @@ protected ExecType optFindExecType(boolean transitive) {
747
747
748
748
checkAndSetForcedPlatform ();
749
749
750
- DataType dt1 = getInput (). get (0 ).getDataType ();
751
- DataType dt2 = getInput (). get (1 ).getDataType ();
750
+ final DataType dt1 = getInput (0 ).getDataType ();
751
+ final DataType dt2 = getInput (1 ).getDataType ();
752
752
753
753
if ( _etypeForced != null ) {
754
754
setExecType (_etypeForced );
@@ -796,18 +796,28 @@ else if ( dt1 == DataType.SCALAR && dt2 == DataType.MATRIX ) {
796
796
checkAndSetInvalidCPDimsAndSize ();
797
797
}
798
798
799
- //spark-specific decision refinement (execute unary scalar w/ spark input and
799
+ // spark-specific decision refinement (execute unary scalar w/ spark input and
800
800
// single parent also in spark because it's likely cheap and reduces intermediates)
801
- if (transitive && _etype == ExecType .CP && _etypeForced != ExecType .CP && _etypeForced != ExecType .FED &&
802
- getDataType ().isMatrix () // output should be a matrix
803
- && (dt1 .isScalar () || dt2 .isScalar ()) // one side should be scalar
804
- && supportsMatrixScalarOperations () // scalar operations
805
- && !(getInput ().get (dt1 .isScalar () ? 1 : 0 ) instanceof DataOp ) // input is not checkpoint
806
- && getInput ().get (dt1 .isScalar () ? 1 : 0 ).getParent ().size () == 1 // unary scalar is only parent
807
- && !HopRewriteUtils .isSingleBlock (getInput ().get (dt1 .isScalar () ? 1 : 0 )) // single block triggered exec
808
- && getInput ().get (dt1 .isScalar () ? 1 : 0 ).optFindExecType () == ExecType .SPARK ) {
809
- // pull unary scalar operation into spark
810
- _etype = ExecType .SPARK ;
801
+ if (transitive // we allow transitive Spark operations. continue sequences of spark operations
802
+ && _etype == ExecType .CP // The instruction is currently in CP
803
+ && _etypeForced != ExecType .CP // not forced CP
804
+ && _etypeForced != ExecType .FED // not federated
805
+ && (getDataType ().isMatrix () || getDataType ().isFrame ()) // output should be a matrix or frame
806
+ ) {
807
+ final boolean v1 = getInput (0 ).isScalarOrVectorBellowBlockSize ();
808
+ final boolean v2 = getInput (1 ).isScalarOrVectorBellowBlockSize ();
809
+ final boolean left = v1 == true ; // left side is the vector or scalar
810
+ final Hop sparkIn = getInput (left ? 1 : 0 );
811
+ if ((v1 ^ v2 ) // XOR only one side is allowed to be a vector or a scalar.
812
+ && (supportsMatrixScalarOperations () || op == OpOp2 .APPLY_SCHEMA ) // supported operation
813
+ && sparkIn .getParent ().size () == 1 // only one parent
814
+ && !HopRewriteUtils .isSingleBlock (sparkIn ) // single block triggered exec
815
+ && sparkIn .optFindExecType () == ExecType .SPARK // input was spark op.
816
+ && !(sparkIn instanceof DataOp ) // input is not checkpoint
817
+ ) {
818
+ // pull operation into spark
819
+ _etype = ExecType .SPARK ;
820
+ }
811
821
}
812
822
813
823
if ( OptimizerUtils .ALLOW_BINARY_UPDATE_IN_PLACE &&
@@ -837,7 +847,7 @@ else if( (op == OpOp2.CBIND && getDataType().isList())
837
847
|| (op == OpOp2 .RBIND && getDataType ().isList ())) {
838
848
_etype = ExecType .CP ;
839
849
}
840
-
850
+
841
851
//mark for recompile (forever)
842
852
setRequiresRecompileIfNecessary ();
843
853
0 commit comments