Skip to content

Commit c35c129

Browse files
committed
[DO NOT MERGE][skip ci] JAVA 17 BWARE COMMIT
1 parent eafe1e2 commit c35c129

File tree

14 files changed

+278
-91
lines changed

14 files changed

+278
-91
lines changed

src/main/java/org/apache/sysds/hops/AggBinaryOp.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,7 @@ private boolean isApplicableForTransitiveSparkExecType(boolean left)
439439
|| (left && !isLeftTransposeRewriteApplicable(true)))
440440
&& getInput(index).getParent().size()==1 //bagg is only parent
441441
&& !getInput(index).areDimsBelowThreshold()
442-
&& (getInput(index).optFindExecType() == ExecType.SPARK
443-
|| (getInput(index) instanceof DataOp && ((DataOp)getInput(index)).hasOnlyRDD()))
442+
&& getInput(index).hasSparkOutput()
444443
&& getInput(index).getOutputMemEstimate()>getOutputMemEstimate();
445444
}
446445

src/main/java/org/apache/sysds/hops/BinaryOp.java

+24-14
Original file line numberDiff line numberDiff line change
@@ -747,8 +747,8 @@ protected ExecType optFindExecType(boolean transitive) {
747747

748748
checkAndSetForcedPlatform();
749749

750-
DataType dt1 = getInput().get(0).getDataType();
751-
DataType dt2 = getInput().get(1).getDataType();
750+
final DataType dt1 = getInput(0).getDataType();
751+
final DataType dt2 = getInput(1).getDataType();
752752

753753
if( _etypeForced != null ) {
754754
setExecType(_etypeForced);
@@ -796,18 +796,28 @@ else if ( dt1 == DataType.SCALAR && dt2 == DataType.MATRIX ) {
796796
checkAndSetInvalidCPDimsAndSize();
797797
}
798798

799-
//spark-specific decision refinement (execute unary scalar w/ spark input and
799+
// spark-specific decision refinement (execute unary scalar w/ spark input and
800800
// single parent also in spark because it's likely cheap and reduces intermediates)
801-
if(transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP && _etypeForced != ExecType.FED &&
802-
getDataType().isMatrix() // output should be a matrix
803-
&& (dt1.isScalar() || dt2.isScalar()) // one side should be scalar
804-
&& supportsMatrixScalarOperations() // scalar operations
805-
&& !(getInput().get(dt1.isScalar() ? 1 : 0) instanceof DataOp) // input is not checkpoint
806-
&& getInput().get(dt1.isScalar() ? 1 : 0).getParent().size() == 1 // unary scalar is only parent
807-
&& !HopRewriteUtils.isSingleBlock(getInput().get(dt1.isScalar() ? 1 : 0)) // single block triggered exec
808-
&& getInput().get(dt1.isScalar() ? 1 : 0).optFindExecType() == ExecType.SPARK) {
809-
// pull unary scalar operation into spark
810-
_etype = ExecType.SPARK;
801+
if(transitive // we allow transitive Spark operations. continue sequences of spark operations
802+
&& _etype == ExecType.CP // The instruction is currently in CP
803+
&& _etypeForced != ExecType.CP // not forced CP
804+
&& _etypeForced != ExecType.FED // not federated
805+
&& (getDataType().isMatrix() || getDataType().isFrame()) // output should be a matrix or frame
806+
) {
807+
final boolean v1 = getInput(0).isScalarOrVectorBellowBlockSize();
808+
final boolean v2 = getInput(1).isScalarOrVectorBellowBlockSize();
809+
final boolean left = v1 == true; // left side is the vector or scalar
810+
final Hop sparkIn = getInput(left ? 1 : 0);
811+
if((v1 ^ v2) // XOR only one side is allowed to be a vector or a scalar.
812+
&& (supportsMatrixScalarOperations() || op == OpOp2.APPLY_SCHEMA) // supported operation
813+
&& sparkIn.getParent().size() == 1 // only one parent
814+
&& !HopRewriteUtils.isSingleBlock(sparkIn) // single block triggered exec
815+
&& sparkIn.optFindExecType() == ExecType.SPARK // input was spark op.
816+
&& !(sparkIn instanceof DataOp) // input is not checkpoint
817+
) {
818+
// pull operation into spark
819+
_etype = ExecType.SPARK;
820+
}
811821
}
812822

813823
if( OptimizerUtils.ALLOW_BINARY_UPDATE_IN_PLACE &&
@@ -837,7 +847,7 @@ else if( (op == OpOp2.CBIND && getDataType().isList())
837847
|| (op == OpOp2.RBIND && getDataType().isList())) {
838848
_etype = ExecType.CP;
839849
}
840-
850+
841851
//mark for recompile (forever)
842852
setRequiresRecompileIfNecessary();
843853

src/main/java/org/apache/sysds/hops/Hop.java

+11
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,12 @@ public final String toString() {
10401040
// ========================================================================================
10411041

10421042

1043+
protected boolean isScalarOrVectorBellowBlockSize(){
1044+
return getDataType().isScalar() || (dimsKnown() &&
1045+
(( _dc.getRows() == 1 && _dc.getCols() < ConfigurationManager.getBlocksize())
1046+
|| _dc.getCols() == 1 && _dc.getRows() < ConfigurationManager.getBlocksize()));
1047+
}
1048+
10431049
protected boolean isVector() {
10441050
return (dimsKnown() && (_dc.getRows() == 1 || _dc.getCols() == 1) );
10451051
}
@@ -1624,6 +1630,11 @@ protected void setMemoryAndComputeEstimates(Lop lop) {
16241630
lop.setComputeEstimate(ComputeCost.getHOPComputeCost(this));
16251631
}
16261632

1633+
protected boolean hasSparkOutput(){
1634+
return (this.optFindExecType() == ExecType.SPARK
1635+
|| (this instanceof DataOp && ((DataOp)this).hasOnlyRDD()));
1636+
}
1637+
16271638
/**
16281639
* Set parse information.
16291640
*

src/main/java/org/apache/sysds/hops/UnaryOp.java

+24-10
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
366366
} else {
367367
sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
368368
}
369-
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
369+
370+
if(getDataType() == DataType.FRAME)
371+
return OptimizerUtils.estimateSizeExactFrame(dim1, dim2);
372+
else
373+
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
370374
}
371375

372376
@Override
@@ -463,6 +467,13 @@ public boolean isMetadataOperation() {
463467
|| _op == OpOp1.CAST_AS_LIST;
464468
}
465469

470+
private boolean isDisallowedSparkOps(){
471+
return isCumulativeUnaryOperation()
472+
|| isCastUnaryOperation()
473+
|| _op==OpOp1.MEDIAN
474+
|| _op==OpOp1.IQM;
475+
}
476+
466477
@Override
467478
protected ExecType optFindExecType(boolean transitive)
468479
{
@@ -493,19 +504,22 @@ else if ( getInput().get(0).areDimsBelowThreshold() || getInput().get(0).isVecto
493504
checkAndSetInvalidCPDimsAndSize();
494505
}
495506

507+
496508
//spark-specific decision refinement (execute unary w/ spark input and
497509
//single parent also in spark because it's likely cheap and reduces intermediates)
498-
if( _etype == ExecType.CP && _etypeForced != ExecType.CP
499-
&& getInput().get(0).optFindExecType() == ExecType.SPARK
500-
&& getDataType().isMatrix()
501-
&& !isCumulativeUnaryOperation() && !isCastUnaryOperation()
502-
&& _op!=OpOp1.MEDIAN && _op!=OpOp1.IQM
503-
&& !(getInput().get(0) instanceof DataOp) //input is not checkpoint
504-
&& getInput().get(0).getParent().size()==1 ) //unary is only parent
505-
{
510+
if(_etype == ExecType.CP // currently CP instruction
511+
&& _etype != ExecType.SPARK /// currently not SP.
512+
&& _etypeForced != ExecType.CP // not forced as CP instruction
513+
&& getInput(0).hasSparkOutput() // input is a spark instruction
514+
&& (getDataType().isMatrix() || getDataType().isFrame()) // output is a matrix or frame
515+
&& !isDisallowedSparkOps() // is invalid spark instruction
516+
// && !(getInput().get(0) instanceof DataOp) // input is not checkpoint
517+
// && getInput(0).getParent().size() <= 1// unary is only parent
518+
) {
506519
//pull unary operation into spark
507520
_etype = ExecType.SPARK;
508521
}
522+
509523

510524
//mark for recompile (forever)
511525
setRequiresRecompileIfNecessary();
@@ -520,7 +534,7 @@ && getInput().get(0).getParent().size()==1 ) //unary is only parent
520534
} else {
521535
setRequiresRecompileIfNecessary();
522536
}
523-
537+
524538
return _etype;
525539
}
526540

src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@
5858
import org.apache.sysds.runtime.compress.lib.CLALibMMChain;
5959
import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult;
6060
import org.apache.sysds.runtime.compress.lib.CLALibMerge;
61-
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
6261
import org.apache.sysds.runtime.compress.lib.CLALibReorg;
62+
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
6363
import org.apache.sysds.runtime.compress.lib.CLALibReshape;
6464
import org.apache.sysds.runtime.compress.lib.CLALibRexpand;
6565
import org.apache.sysds.runtime.compress.lib.CLALibScalar;
@@ -1202,8 +1202,8 @@ public void examSparsity(boolean allowCSR, int k) {
12021202
}
12031203

12041204
@Override
1205-
public void sparseToDense(int k) {
1206-
// do nothing
1205+
public MatrixBlock sparseToDense(int k) {
1206+
return this; // do nothing
12071207
}
12081208

12091209
@Override

src/main/java/org/apache/sysds/runtime/instructions/spark/utils/FrameRDDConverterUtils.java

+21-24
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,7 @@ public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc
9090
JavaRDD<String> tmp = input.values()
9191
.map(new TextToStringFunction());
9292
String tmpStr = tmp.first();
93-
boolean metaHeader = tmpStr.startsWith(TfUtils.TXMTD_MVPREFIX)
94-
|| tmpStr.startsWith(TfUtils.TXMTD_NDPREFIX);
95-
tmpStr = (metaHeader) ? tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr;
96-
long rlen = tmp.count() - (hasHeader ? 1 : 0) - (metaHeader ? 2 : 0);
93+
long rlen = tmp.count() ;
9794
long clen = IOUtilFunctions.splitCSV(tmpStr, delim).length;
9895
mc.set(rlen, clen, mc.getBlocksize(), -1);
9996
}
@@ -582,14 +579,14 @@ public Iterator<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0)
582579
_colnames = row.split(_delim);
583580
continue;
584581
}
585-
if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) {
586-
_mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
587-
continue;
588-
}
589-
else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) {
590-
_ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
591-
continue;
592-
}
582+
// if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) {
583+
// _mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
584+
// continue;
585+
// }
586+
// else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) {
587+
// _ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1));
588+
// continue;
589+
// }
593590

594591
//adjust row index for header and meta data
595592
rowix += (_hasHeader ? 0 : 1) - ((_mvMeta == null) ? 0 : 2);
@@ -670,18 +667,18 @@ public Iterator<String> call(Tuple2<Long, FrameBlock> arg0)
670667
ret.add(sb.toString());
671668
sb.setLength(0); //reset
672669
}
673-
if( !blk.isColumnMetadataDefault() ) {
674-
sb.append(TfUtils.TXMTD_MVPREFIX + _props.getDelim());
675-
for( int j=0; j<blk.getNumColumns(); j++ )
676-
sb.append(blk.getColumnMetadata(j).getMvValue() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
677-
ret.add(sb.toString());
678-
sb.setLength(0); //reset
679-
sb.append(TfUtils.TXMTD_NDPREFIX + _props.getDelim());
680-
for( int j=0; j<blk.getNumColumns(); j++ )
681-
sb.append(blk.getColumnMetadata(j).getNumDistinct() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
682-
ret.add(sb.toString());
683-
sb.setLength(0); //reset
684-
}
670+
// if( !blk.isColumnMetadataDefault() ) {
671+
// sb.append(TfUtils.TXMTD_MVPREFIX + _props.getDelim());
672+
// for( int j=0; j<blk.getNumColumns(); j++ )
673+
// sb.append(blk.getColumnMetadata(j).getMvValue() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
674+
// ret.add(sb.toString());
675+
// sb.setLength(0); //reset
676+
// sb.append(TfUtils.TXMTD_NDPREFIX + _props.getDelim());
677+
// for( int j=0; j<blk.getNumColumns(); j++ )
678+
// sb.append(blk.getColumnMetadata(j).getNumDistinct() + ((j<blk.getNumColumns()-1)?_props.getDelim():""));
679+
// ret.add(sb.toString());
680+
// sb.setLength(0); //reset
681+
// }
685682
}
686683

687684
//handle Frame block data

src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSV.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.sysds.runtime.DMLRuntimeException;
3232
import org.apache.sysds.runtime.frame.data.FrameBlock;
3333
import org.apache.sysds.runtime.frame.data.iterators.IteratorFactory;
34-
import org.apache.sysds.runtime.transform.TfUtils;
3534
import org.apache.sysds.runtime.util.HDFSTool;
3635

3736
/**
@@ -107,17 +106,7 @@ protected static void writeCSVFrameToFile( Path path, JobConf job, FileSystem fs
107106
}
108107
sb.append('\n');
109108
}
110-
//append meta data
111-
if( !src.isColumnMetadataDefault() ) {
112-
sb.append(TfUtils.TXMTD_MVPREFIX + delim);
113-
for( int j=0; j<cols; j++ )
114-
sb.append(src.getColumnMetadata(j).getMvValue() + ((j<cols-1)?delim:""));
115-
sb.append("\n");
116-
sb.append(TfUtils.TXMTD_NDPREFIX + delim);
117-
for( int j=0; j<cols; j++ )
118-
sb.append(src.getColumnMetadata(j).getNumDistinct() + ((j<cols-1)?delim:""));
119-
sb.append("\n");
120-
}
109+
121110
br.write( sb.toString() );
122111
sb.setLength(0);
123112
}

src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
7777
import org.apache.sysds.runtime.matrix.data.MatrixCell;
7878
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
79-
import org.apache.sysds.runtime.transform.TfUtils;
8079
import org.apache.sysds.runtime.util.LocalFileUtils;
8180

8281
import io.airlift.compress.lzo.LzoCodec;
@@ -696,10 +695,10 @@ public static int countNumColumnsCSV(InputSplit[] splits, InputFormat informat,
696695
try {
697696
if( reader.next(key, value) ) {
698697
boolean hasValue = true;
699-
if( value.toString().startsWith(TfUtils.TXMTD_MVPREFIX) )
700-
hasValue = reader.next(key, value);
701-
if( value.toString().startsWith(TfUtils.TXMTD_NDPREFIX) )
702-
hasValue = reader.next(key, value);
698+
// if( value.toString().startsWith(TfUtils.TXMTD_MVPREFIX) )
699+
// hasValue = reader.next(key, value);
700+
// if( value.toString().startsWith(TfUtils.TXMTD_NDPREFIX) )
701+
// hasValue = reader.next(key, value);
703702
String row = value.toString().trim();
704703
if( hasValue && !row.isEmpty() ) {
705704
ncol = IOUtilFunctions.countTokensCSV(row, delim);

0 commit comments

Comments
 (0)