From 0c121eb7b3b0cba64a89abaf9eae42c641eb38b9 Mon Sep 17 00:00:00 2001 From: Tuluyhan Sozen Date: Thu, 2 Jul 2026 11:58:56 +0200 Subject: [PATCH] [MINOR] Add OOC covariance and TSMM coverage --- .../sysds/runtime/functionobjects/COV.java | 2 + .../instructions/OOCInstructionParser.java | 5 +- .../ooc/CovarianceOOCInstruction.java | 123 +++++++++++++++ .../instructions/ooc/OOCInstruction.java | 2 +- .../instructions/ooc/TSMMOOCInstruction.java | 116 ++++++++++---- .../test/functions/ooc/CovarianceTest.java | 130 ++++++++++++++++ .../functions/ooc/CovarianceWeightsTest.java | 144 ++++++++++++++++++ .../functions/ooc/TransposeSelfMMTest.java | 144 +++++++++++++----- src/test/scripts/functions/ooc/Covariance.dml | 28 ++++ .../functions/ooc/CovarianceWeights.dml | 29 ++++ src/test/scripts/functions/ooc/TSMM.dml | 2 +- src/test/scripts/functions/ooc/TSMMRight.dml | 28 ++++ 12 files changed, 682 insertions(+), 71 deletions(-) create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/ooc/CovarianceOOCInstruction.java create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/CovarianceWeightsTest.java create mode 100644 src/test/scripts/functions/ooc/Covariance.dml create mode 100644 src/test/scripts/functions/ooc/CovarianceWeights.dml create mode 100644 src/test/scripts/functions/ooc/TSMMRight.dml diff --git a/src/main/java/org/apache/sysds/runtime/functionobjects/COV.java b/src/main/java/org/apache/sysds/runtime/functionobjects/COV.java index 836bf972ce6..36be4e8b589 100644 --- a/src/main/java/org/apache/sysds/runtime/functionobjects/COV.java +++ b/src/main/java/org/apache/sysds/runtime/functionobjects/COV.java @@ -63,6 +63,8 @@ private COV() { public Data execute(Data in1, double u, double v, double w2) { CmCovObject cov1=(CmCovObject) in1; + if(w2 == 0) + return cov1; if(cov1.isCOVAllZeros()) { cov1.w=w2; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index 98a454283e2..10950d5ab6d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -29,9 +29,10 @@ import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.CSVReblockOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.CovarianceOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction; -import org.apache.sysds.runtime.instructions.ooc.IndexingOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.DataGenOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.IndexingOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.OOCInstruction; import org.apache.sysds.runtime.instructions.ooc.ParameterizedBuiltinOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction; @@ -103,6 +104,8 @@ else if(parts.length == 4) return TeeOOCInstruction.parseInstruction(str); case CentralMoment: return CentralMomentOOCInstruction.parseInstruction(str); + case Covariance: + return CovarianceOOCInstruction.parseInstruction(str); case Ctable: return CtableOOCInstruction.parseInstruction(str); case ParameterizedBuiltin: diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CovarianceOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CovarianceOOCInstruction.java new file mode 100644 index 00000000000..e1032e4aaa6 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CovarianceOOCInstruction.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.instructions.ooc; + +import java.util.List; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.functionobjects.COV; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.CmCovObject; +import org.apache.sysds.runtime.instructions.cp.DoubleObject; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.COVOperator; +import org.apache.sysds.runtime.meta.DataCharacteristics; + +public class CovarianceOOCInstruction extends ComputationOOCInstruction { + + private CovarianceOOCInstruction(COVOperator cov, CPOperand in1, CPOperand in2, CPOperand in3, CPOperand out, + String opcode, String str) { + super(OOCType.COV, cov, in1, in2, in3, out, opcode, str); + } + + public static CovarianceOOCInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + String opcode = parts[0]; + + if(!opcode.equalsIgnoreCase(Opcodes.COV.toString())) + throw new DMLRuntimeException("CovarianceOOCInstruction.parseInstruction():: Unknown opcode " + opcode); + + // the OOC instruction string matches the Spark format, + + COVOperator cov = new COVOperator(COV.getCOMFnObject()); + if(parts.length == 4) { // this is the case for unweighted cov.A.B.out + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand out = new CPOperand(parts[3]); + return new CovarianceOOCInstruction(cov, in1, in2, null, out, opcode, str); + } + else if(parts.length == 5) {// this is the case for weighted cov.A.B.W.out + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand in3 = new CPOperand(parts[3]); + CPOperand out = new CPOperand(parts[4]); + return new CovarianceOOCInstruction(cov, in1, in2, in3, out, opcode, str); + } + else { + throw new DMLRuntimeException("Invalid number of arguments in Instruction: " + str); + } + } + + @Override + public void processInstruction(ExecutionContext ec) { + COVOperator cov_op = (COVOperator) _optr; + + MatrixObject mo1 = ec.getMatrixObject(input1.getName()); + MatrixObject mo2 = ec.getMatrixObject(input2.getName()); + + OOCStream q1 = mo1.getStreamHandle(); + OOCStream q2 = mo2.getStreamHandle(); + + OOCStream covObjs = createWritableStream(); + + if(input3 == null) { + // unweighted covariance join the two tile streams by block index + joinOOC(q1, q2, covObjs, + (a, b) -> ((MatrixBlock) a.getValue()).covOperations(cov_op, (MatrixBlock) b.getValue()), + IndexedMatrixValue::getIndexes); + } + else { + // weighted covariance additionally join the weights tile stream + MatrixObject mo3 = ec.getMatrixObject(input3.getName()); + + DataCharacteristics dc1 = ec.getDataCharacteristics(input1.getName()); + DataCharacteristics dc2 = ec.getDataCharacteristics(input2.getName()); + DataCharacteristics dc3 = ec.getDataCharacteristics(input3.getName()); + if(dc1.getBlocksize() != dc2.getBlocksize() || dc1.getBlocksize() != dc3.getBlocksize()) + throw new DMLRuntimeException("Different block sizes are not yet supported"); + + OOCStream q3 = mo3.getStreamHandle(); + + joinOOC(List.of(q1, q2, q3), covObjs, + tiles -> ((MatrixBlock) tiles.get(0).getValue()).covOperations(cov_op, + (MatrixBlock) tiles.get(1).getValue(), (MatrixBlock) tiles.get(2).getValue()), + IndexedMatrixValue::getIndexes); + } + + try { + CmCovObject agg = covObjs.dequeue(); + CmCovObject next; + + while((next = covObjs.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) + agg = (CmCovObject) cov_op.fn.execute(agg, next); + + ec.setScalarOutput(output.getName(), new DoubleObject(agg.getRequiredResult(cov_op))); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java index 80d71231646..50a73869424 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java @@ -80,7 +80,7 @@ public abstract class OOCInstruction extends Instruction { public enum OOCType { Reblock, Tee, Binary, Ternary, Unary, AggregateUnary, AggregateBinary, AggregateTernary, MAPMM, MMTSJ, - MAPMMCHAIN, Reorg, CM, Ctable, MatrixIndexing, ParameterizedBuiltin, Rand, Append, Quaternary, Reshape + MAPMMCHAIN, Reorg, CM, COV, Ctable, MatrixIndexing, ParameterizedBuiltin, Rand, Append, Quaternary, Reshape } protected final OOCInstruction.OOCType _ooctype; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java index e0207940409..e93d8fd3f75 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TSMMOOCInstruction.java @@ -19,18 +19,23 @@ package org.apache.sysds.runtime.instructions.ooc; +import java.util.List; +import java.util.concurrent.CompletableFuture; + import org.apache.sysds.common.Opcodes; import org.apache.sysds.lops.MMTSJ; import org.apache.sysds.lops.MMTSJ.MMTSJType; +import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.functionobjects.Multiply; import org.apache.sysds.runtime.functionobjects.Plus; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysds.runtime.matrix.operators.AggregateOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; @@ -48,7 +53,7 @@ public static TSMMOOCInstruction parseInstruction(String str) { String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); InstructionUtils.checkNumFields(parts, 3); String opcode = parts[0]; - CPOperand in1 = new CPOperand(parts[1]); // the large matrix (streamed), columns <= blocksize + CPOperand in1 = new CPOperand(parts[1]); // the large matrix (streamed) CPOperand out = new CPOperand(parts[2]); MMTSJ.MMTSJType mmtsjType = MMTSJ.MMTSJType.valueOf(parts[3]); @@ -59,39 +64,90 @@ public static TSMMOOCInstruction parseInstruction(String str) { } @Override - public void processInstruction( ExecutionContext ec ) { + public void processInstruction(ExecutionContext ec) { MatrixObject min = ec.getMatrixObject(input1); - int nRows = (int) min.getDataCharacteristics().getRows(); - int nCols = (int) min.getDataCharacteristics().getCols(); - int bLen = min.getDataCharacteristics().getBlocksize(); - - OOCStream qIn = min.getStreamHandle(); + int numRowBlocks = Math.toIntExact(min.getDataCharacteristics().getNumRowBlocks()); + int numColBlocks = Math.toIntExact(min.getDataCharacteristics().getNumColBlocks()); + int blocksPerJoinGroup = _type.isLeft() ? numColBlocks : numRowBlocks; + int partialsPerOutput = _type.isLeft() ? numRowBlocks : numColBlocks; + + OOCStreamable inputStreamable = min.getStreamable(); + final boolean createdCache = !inputStreamable.hasStreamCache(); + final CachingStream inputCache = createdCache ? new CachingStream(min.getStreamHandle()) + : inputStreamable.getStreamCache(); + + OOCStream> groupedPartials = createWritableStream(); + OOCStream partials = createWritableStream(); + OOCStream out = createWritableStream(); + addOutStream(out); + ec.getMatrixObject(output).setStreamHandle(out); + + CompletableFuture joinFuture = joinManyOOC(inputCache.getReadStream(), inputCache.getReadStream(), groupedPartials, + this::createPartialOutputTiles, this::getJoinIndex, this::getJoinIndex, + blocksPerJoinGroup, blocksPerJoinGroup); + CompletableFuture expandFuture = expandOOC(groupedPartials, partials, values -> values); + BinaryOperator plus = InstructionUtils.parseBinaryOperator(Opcodes.PLUS.toString()); + CompletableFuture outFuture = groupedReduceOOC(partials, out, (left, right) -> { + MatrixBlock result = ((MatrixBlock) left.getValue()).binaryOperations(plus, right.getValue()); + left.setValue(result); + return left; + }, partialsPerOutput); + + propagateFailuresToOutput(out, List.of(joinFuture, expandFuture, outFuture)); + + outFuture.whenComplete((result, error) -> { + if(createdCache) + inputCache.scheduleDeletion(); + }); + } + + private long getJoinIndex(IndexedMatrixValue value) { + return _type.isLeft() ? value.getIndexes().getRowIndex() : value.getIndexes().getColumnIndex(); + } - //validation check TODO extend compiler to not create OOC otherwise - if( (_type.isLeft() && nCols > bLen) - || (_type.isRight() && nRows > bLen) ) - { - throw new UnsupportedOperationException(); + private long getOutputIndex(IndexedMatrixValue value) { + return _type.isLeft() ? value.getIndexes().getColumnIndex() : value.getIndexes().getRowIndex(); + } + + private List createPartialOutputTiles(IndexedMatrixValue left, IndexedMatrixValue right) { + long leftIndex = getOutputIndex(left); + long rightIndex = getOutputIndex(right); + if(leftIndex > rightIndex) + return List.of(); + + MatrixBlock leftBlock = (MatrixBlock) left.getValue(); + MatrixBlock rightBlock = (MatrixBlock) right.getValue(); + if(leftIndex == rightIndex) { + MatrixBlock diagonal = leftBlock.transposeSelfMatrixMultOperations(new MatrixBlock(), _type); + return List.of(new IndexedMatrixValue(new MatrixIndexes(leftIndex, rightIndex), diagonal)); } - - //int dim = _type.isLeft() ? nCols : nRows; - MatrixBlock resultBlock = null; - - OOCStream tmpStream = createWritableStream(); - - mapOOC(qIn, tmpStream, - tmp -> ((MatrixBlock) tmp.getValue()) - .transposeSelfMatrixMultOperations(new MatrixBlock(), _type)); - - MatrixBlock tmp; - while ((tmp = tmpStream.dequeue()) != LocalTaskQueue.NO_MORE_TASKS) { - if (resultBlock == null) - resultBlock = tmp; - else - resultBlock.binaryOperationsInPlace(plus, tmp); + + MatrixBlock partial = multiplyOffDiagonal(leftBlock, rightBlock); + MatrixBlock mirror = LibMatrixReorg.transpose(partial); + return List.of( + new IndexedMatrixValue(new MatrixIndexes(leftIndex, rightIndex), partial), + new IndexedMatrixValue(new MatrixIndexes(rightIndex, leftIndex), mirror)); + } + + private MatrixBlock multiplyOffDiagonal(MatrixBlock leftBlock, MatrixBlock rightBlock) { + if(_type.isLeft()) { + MatrixBlock leftTranspose = LibMatrixReorg.transpose(leftBlock); + return leftTranspose.aggregateBinaryOperations(leftTranspose, rightBlock, new MatrixBlock(), + (AggregateBinaryOperator) _optr); } - ec.setMatrixOutput(output.getName(), resultBlock); + MatrixBlock rightTranspose = LibMatrixReorg.transpose(rightBlock); + return leftBlock.aggregateBinaryOperations(leftBlock, rightTranspose, new MatrixBlock(), + (AggregateBinaryOperator) _optr); + } + + private static void propagateFailuresToOutput(OOCStream out, List> futures) { + for(CompletableFuture future : futures) { + future.exceptionally(error -> { + out.propagateFailure(DMLRuntimeException.of(error)); + return null; + }); + } } } diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java new file mode 100644 index 00000000000..34996f09346 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import java.io.IOException; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +public class CovarianceTest extends AutomatedTestBase { + private final static String TEST_NAME = "Covariance"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + CovarianceTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + + private final static String INPUT_A = "A"; + private final static String INPUT_B = "B"; + private final static String OUTPUT_CP = "R_CP"; + private final static String OUTPUT_OOC = "R_OOC"; + + private final static int rows = 1871; + private final static int cols = 1; + private final static int blocksize = 1000; + private final static int maxVal = 7; + + private final static double denseSparsity = 0.65; + private final static double sparseSparsity = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { OUTPUT_CP, OUTPUT_OOC })); + } + + @Test + public void testCovarianceDenseOOC() { + runCovarianceOOCCompareTest(false); + } + + @Test + public void testCovarianceSparseOOC() { + runCovarianceOOCCompareTest(true); + } + + private void runCovarianceOOCCompareTest(boolean sparse) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + + double sparsity = sparse ? sparseSparsity : denseSparsity; + + double[][] A = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 7); + double[][] B = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 823); + + MatrixBlock ABlock = DataConverter.convertToMatrixBlock(A); + MatrixBlock BBlock = DataConverter.convertToMatrixBlock(B); + + writeBinaryMatrix(INPUT_A, ABlock, rows, cols, blocksize); + writeBinaryMatrix(INPUT_B, BBlock, rows, cols, blocksize); + + // Reference run: normal single-node CP execution. + programArgs = new String[] { + "-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_CP) + }; + runTest(true, false, null, -1); + + // OOC run: compare the out-of-core covariance path against CP. + programArgs = new String[] { + "-explain", "-stats", "-ooc", + "-args", input(INPUT_A), input(INPUT_B), output(OUTPUT_OOC) + }; + runTest(true, false, null, -1); + + MatrixBlock cpResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_CP), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + MatrixBlock oocResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_OOC), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + TestUtils.compareMatrices(cpResult, oocResult, eps); + } + catch(IOException ex) { + throw new RuntimeException(ex); + } + finally { + resetExecMode(platformOld); + } + } + + private void writeBinaryMatrix(String name, MatrixBlock mb, int rows, int cols, int blocksize) throws IOException { + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + writer.writeMatrixToHDFS(mb, input(name), rows, cols, blocksize, mb.getNonZeros()); + + HDFSTool.writeMetaDataFile(input(name + ".mtd"), + Types.ValueType.FP64, + new MatrixCharacteristics(rows, cols, blocksize, mb.getNonZeros()), + Types.FileFormat.BINARY); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceWeightsTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceWeightsTest.java new file mode 100644 index 00000000000..9e63efef3a7 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/CovarianceWeightsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import java.io.IOException; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +public class CovarianceWeightsTest extends AutomatedTestBase { + private final static String TEST_NAME = "CovarianceWeights"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + CovarianceWeightsTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + + private final static String INPUT_A = "A"; + private final static String INPUT_B = "B"; + private final static String INPUT_W = "W"; + private final static String OUTPUT_CP = "R_CP"; + private final static String OUTPUT_OOC = "R_OOC"; + + private final static int rows = 1871; + private final static int cols = 1; + private final static int blocksize = 1000; + private final static int maxVal = 7; + + private final static double denseSparsity = 0.65; + private final static double sparseSparsity = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { OUTPUT_CP, OUTPUT_OOC })); + } + + @Test + public void testWeightedCovarianceDenseOOC() { + runWeightedCovarianceOOCCompareTest(false); + } + + @Test + public void testWeightedCovarianceSparseOOC() { + runWeightedCovarianceOOCCompareTest(true); + } + + private void runWeightedCovarianceOOCCompareTest(boolean sparse) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + + double sparsity = sparse ? sparseSparsity : denseSparsity; + + double[][] A = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 7); + double[][] B = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 823); + + // Weights should be positive. Avoid zero/negative weights. + double[][] W = getRandomMatrix(rows, cols, 1, maxVal, sparsity, 1234); + + MatrixBlock ABlock = DataConverter.convertToMatrixBlock(A); + MatrixBlock BBlock = DataConverter.convertToMatrixBlock(B); + MatrixBlock WBlock = DataConverter.convertToMatrixBlock(W); + + writeBinaryMatrix(INPUT_A, ABlock, rows, cols, blocksize); + writeBinaryMatrix(INPUT_B, BBlock, rows, cols, blocksize); + writeBinaryMatrix(INPUT_W, WBlock, rows, cols, blocksize); + + // Reference run: normal single-node CP execution. + programArgs = new String[] { + "-args", + input(INPUT_A), + input(INPUT_B), + input(INPUT_W), + output(OUTPUT_CP) + }; + runTest(true, false, null, -1); + + // OOC run: compare the out-of-core weighted covariance path against CP. + programArgs = new String[] { + "-explain", "-stats", "-ooc", + "-args", + input(INPUT_A), + input(INPUT_B), + input(INPUT_W), + output(OUTPUT_OOC) + }; + runTest(true, false, null, -1); + + MatrixBlock cpResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_CP), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + MatrixBlock oocResult = DataConverter.readMatrixFromHDFS( + output(OUTPUT_OOC), Types.FileFormat.BINARY, 1, 1, blocksize, 1); + + TestUtils.compareMatrices(cpResult, oocResult, eps); + } + catch(IOException ex) { + throw new RuntimeException(ex); + } + finally { + resetExecMode(platformOld); + } + } + + private void writeBinaryMatrix(String name, MatrixBlock mb, int rows, int cols, int blocksize) throws IOException { + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); + writer.writeMatrixToHDFS(mb, input(name), rows, cols, blocksize, mb.getNonZeros()); + + HDFSTool.writeMetaDataFile(input(name + ".mtd"), + Types.ValueType.FP64, + new MatrixCharacteristics(rows, cols, blocksize, mb.getNonZeros()), + Types.FileFormat.BINARY); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java index ed61038a716..3b7adf3a350 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/TransposeSelfMMTest.java @@ -21,7 +21,8 @@ import org.apache.sysds.common.Opcodes; import org.apache.sysds.common.Types; -import org.apache.sysds.lops.MMTSJ; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.lops.MMTSJ.MMTSJType; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.io.MatrixWriterFactory; @@ -36,76 +37,107 @@ import org.junit.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; public class TransposeSelfMMTest extends AutomatedTestBase { - private final static String TEST_NAME1 = "TSMM"; + private static final String TEST_NAME_LEFT = "TSMM"; + private static final String TEST_NAME_RIGHT = "TSMMRight"; private final static String TEST_DIR = "functions/ooc/"; private final static String TEST_CLASS_DIR = TEST_DIR + TransposeSelfMMTest.class.getSimpleName() + "/"; private final static double eps = 1e-8; private static final String INPUT_NAME = "X"; - private static final String OUTPUT_NAME = "res"; + private static final String OUTPUT_NAME_CP = "res_cp"; + private static final String OUTPUT_NAME_OOC = "res_ooc"; - private final static int rows = 2143; - private final static int cols = 123; + private static final int SINGLE_TILE_ROWS = 2143; + private static final int SINGLE_TILE_COLS = 123; + private static final int SINGLE_TILE_BLOCK_SIZE = 1000; + private static final int MULTI_TILE_ROWS = 1501; + private static final int MULTI_TILE_COLS = 1301; + private static final int MULTI_TILE_BLOCK_SIZE = 500; private final static double sparsity1 = 0.7; private final static double sparsity2 = 0.1; - private final int k = 1; @Override public void setUp() { TestUtils.clearAssertionInformation(); - TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); - addTestConfiguration(TEST_NAME1, config); + addTestConfiguration(TEST_NAME_LEFT, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME_LEFT)); + addTestConfiguration(TEST_NAME_RIGHT, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME_RIGHT)); } @Test - public void testTsmmDense() { - runTSMMTest(cols, false); + public void testTsmmLeftDenseSingleTile() { + runTSMMTest(MMTSJType.LEFT, SINGLE_TILE_ROWS, SINGLE_TILE_COLS, SINGLE_TILE_BLOCK_SIZE, false); } - + @Test - public void testTsmmSparse() { - runTSMMTest(cols, false); + public void testTsmmLeftSparseSingleTile() { + runTSMMTest(MMTSJType.LEFT, SINGLE_TILE_ROWS, SINGLE_TILE_COLS, SINGLE_TILE_BLOCK_SIZE, true); } - private void runTSMMTest(int cols, boolean sparse ) - { + @Test + public void testTsmmRightDenseSingleTile() { + runTSMMTest(MMTSJType.RIGHT, SINGLE_TILE_COLS, SINGLE_TILE_ROWS, SINGLE_TILE_BLOCK_SIZE, false); + } + + @Test + public void testTsmmLeftDenseMultiTile() { + runTSMMTest(MMTSJType.LEFT, MULTI_TILE_ROWS, MULTI_TILE_COLS, MULTI_TILE_BLOCK_SIZE, false); + } + + @Test + public void testTsmmLeftSparseMultiTile() { + runTSMMTest(MMTSJType.LEFT, MULTI_TILE_ROWS, MULTI_TILE_COLS, MULTI_TILE_BLOCK_SIZE, true); + } + + @Test + public void testTsmmRightDenseMultiTile() { + runTSMMTest(MMTSJType.RIGHT, MULTI_TILE_ROWS, MULTI_TILE_COLS, MULTI_TILE_BLOCK_SIZE, false); + } + + @Test + public void testTsmmRightSparseMultiTile() { + runTSMMTest(MMTSJType.RIGHT, MULTI_TILE_ROWS, MULTI_TILE_COLS, MULTI_TILE_BLOCK_SIZE, true); + } + + private void runTSMMTest(MMTSJType type, int rows, int cols, int blockSize, boolean sparse) { Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); - try - { - getAndLoadTestConfiguration(TEST_NAME1); + try { + String testName = type.isLeft() ? TEST_NAME_LEFT : TEST_NAME_RIGHT; + getAndLoadTestConfiguration(testName); + setDefaultBlockSizeInConfig(blockSize); String HOME = SCRIPT_DIR + TEST_DIR; - fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; - programArgs = new String[]{"-explain", "-stats", "-ooc", - "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; + fullDMLScriptName = HOME + testName + ".dml"; - // 1. Generate the data in-memory as MatrixBlock objects double[][] A_data = getRandomMatrix(rows, cols, 0, 1, sparse?sparsity2:sparsity1, 10); - - // 2. Convert the double arrays to MatrixBlock objects MatrixBlock A_mb = DataConverter.convertToMatrixBlock(A_data); - - // 3. Create a binary matrix writer MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.BINARY); - - // 4. Write matrix A to a binary SequenceFile - writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, 1000, A_mb.getNonZeros()); + writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, blockSize, A_mb.getNonZeros()); HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64, - new MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()), Types.FileFormat.BINARY); + new MatrixCharacteristics(rows, cols, blockSize, A_mb.getNonZeros()), Types.FileFormat.BINARY); + + programArgs = new String[] {"-stats", "-args", input(INPUT_NAME), output(OUTPUT_NAME_CP)}; + runTest(true, false, null, -1); + programArgs = new String[] {"-explain", "-stats", "-ooc", + "-args", input(INPUT_NAME), output(OUTPUT_NAME_OOC)}; runTest(true, false, null, -1); - //check tsmm OOC Assert.assertTrue("OOC wasn't used for TSMM", heavyHittersContainsString(Instruction.OOC_INST_PREFIX + Opcodes.TSMM)); - - //compare results - MatrixBlock ret1 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME), - Types.FileFormat.BINARY, cols, cols, 1000, cols*cols); - MatrixBlock ret2 = new MatrixBlock(rows, rows, false); - A_mb.transposeSelfMatrixMultOperations(ret2, MMTSJ.MMTSJType.LEFT, k); - TestUtils.compareMatrices(ret1, ret2, eps); + + MatrixCharacteristics meta = readDMLMetaDataFile(OUTPUT_NAME_OOC); + int outputDim = assertOutputMetadata(type, meta, rows, cols, blockSize); + assertDeepMultiTileOutput(meta); + + MatrixBlock actual = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME_OOC), + Types.FileFormat.BINARY, outputDim, outputDim, blockSize); + MatrixBlock expected = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME_CP), + Types.FileFormat.BINARY, outputDim, outputDim, blockSize); + TestUtils.compareMatrices(actual, expected, eps); + assertSymmetricOffDiagonal(actual, outputDim, blockSize); } catch (IOException e) { throw new RuntimeException(e); @@ -114,4 +146,40 @@ private void runTSMMTest(int cols, boolean sparse ) resetExecMode(platformOld); } } + + private static int assertOutputMetadata(MMTSJType type, MatrixCharacteristics meta, int inputRows, int inputCols, + int blockSize) { + int outputDim = type.isLeft() ? inputCols : inputRows; + Assert.assertEquals(type + " TSMM output row metadata", outputDim, meta.getRows()); + Assert.assertEquals(type + " TSMM output column metadata", outputDim, meta.getCols()); + Assert.assertEquals(type + " TSMM output blocksize metadata", blockSize, meta.getBlocksize()); + return outputDim; + } + + private static void assertSymmetricOffDiagonal(MatrixBlock actual, int outputDim, int blockSize) { + if(outputDim <= blockSize) + return; + + int[] rows = new int[] {0, Math.min(blockSize - 1, outputDim - 1)}; + int[] cols = new int[] {blockSize, outputDim - 1}; + for(int row : rows) + for(int col : cols) + Assert.assertEquals(actual.get(row, col), actual.get(col, row), eps); + } + + private static void assertDeepMultiTileOutput(MatrixCharacteristics meta) { + if(meta.getRows() <= meta.getBlocksize()) + return; + + Assert.assertTrue("Multi-tile TSMM tests should cover at least three output row blocks", + meta.getNumRowBlocks() >= 3); + Assert.assertTrue("Multi-tile TSMM tests should cover at least three output column blocks", + meta.getNumColBlocks() >= 3); + } + + private void setDefaultBlockSizeInConfig(int blockSize) throws IOException { + DMLConfig config = new DMLConfig(getCurConfigFile().getPath()); + config.setTextValue(DMLConfig.DEFAULT_BLOCK_SIZE, String.valueOf(blockSize)); + Files.write(getCurConfigFile().toPath(), config.serializeDMLConfig().getBytes(StandardCharsets.UTF_8)); + } } diff --git a/src/test/scripts/functions/ooc/Covariance.dml b/src/test/scripts/functions/ooc/Covariance.dml new file mode 100644 index 00000000000..675f35f43cf --- /dev/null +++ b/src/test/scripts/functions/ooc/Covariance.dml @@ -0,0 +1,28 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = read($1); +B = read($2); + +s = cov(A, B); +res = as.matrix(s); + +write(res, $3, format="binary"); diff --git a/src/test/scripts/functions/ooc/CovarianceWeights.dml b/src/test/scripts/functions/ooc/CovarianceWeights.dml new file mode 100644 index 00000000000..2c077a9699a --- /dev/null +++ b/src/test/scripts/functions/ooc/CovarianceWeights.dml @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = read($1); +B = read($2); +W = read($3); + +s = cov(A, B, W); +res = as.matrix(s); + +write(res, $4, format="binary"); diff --git a/src/test/scripts/functions/ooc/TSMM.dml b/src/test/scripts/functions/ooc/TSMM.dml index 432d2d9daab..2cea8f4226b 100644 --- a/src/test/scripts/functions/ooc/TSMM.dml +++ b/src/test/scripts/functions/ooc/TSMM.dml @@ -19,7 +19,7 @@ # #------------------------------------------------------------- -# Read input matrix and operator from command line args +# Read input matrix from command line args X = read($1); # Operation under test diff --git a/src/test/scripts/functions/ooc/TSMMRight.dml b/src/test/scripts/functions/ooc/TSMMRight.dml new file mode 100644 index 00000000000..37fd5d46b9d --- /dev/null +++ b/src/test/scripts/functions/ooc/TSMMRight.dml @@ -0,0 +1,28 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Read input matrix from command line args +X = read($1); + +# Operation under test +res = X %*% t(X); + +write(res, $2, format="binary")