Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added Input Data POJO #1100

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package zingg.common.client.data;

import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;

import java.util.List;

public abstract class AMultiInputData<D, R, C> implements IData<D, R, C> {

private ZFrame<D, R, C> primaryInput;
private ZFrame<D, R, C> secondaryInput;

public AMultiInputData(List<ZFrame<D, R, C>> inputs) throws ZinggClientException {
setInputs(inputs);
}

public void setPrimaryInput(ZFrame<D, R, C> primaryInput) {
this.primaryInput = primaryInput;
}

public void setSecondaryInput(ZFrame<D, R, C> inputTwo) {
this.secondaryInput = inputTwo;
}

public ZFrame<D, R, C> getPrimaryInput() {
return this.primaryInput;
}

public ZFrame<D, R, C> getSecondaryInput() {
return secondaryInput;
}

@Override
public ZFrame<D, R, C> getData() {
return primaryInput.union(secondaryInput);
}

@Override
public InputType getInputType() {
return InputType.MULTI;
}

protected abstract void setInputs(List<ZFrame<D, R, C>> inputs) throws ZinggClientException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package zingg.common.client.data;

import zingg.common.client.ZFrame;

public class BlockedData<D, R, C> extends GenericData<D, R, C> {
public BlockedData(ZFrame<D, R, C> data) {
super(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package zingg.common.client.data;

import zingg.common.client.ZFrame;

/*
This class can stand on its own,
not required as abstract
*/
public class GenericData<D, R, C> implements IData<D, R, C> {

protected ZFrame<D, R, C> data;

public GenericData(ZFrame<D, R, C> data) {
this.data = data;
}

public void setData(ZFrame<D, R, C> data) {
this.data = data;
}

public ZFrame<D, R, C> getData() {
return this.data;
}

@Override
public InputType getInputType() {
return InputType.SINGLE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package zingg.common.client.data;

import zingg.common.client.ZFrame;

public interface IData<D, R, C> {
ZFrame<D, R, C> getData();
InputType getInputType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package zingg.common.client.data;

public enum InputType {
SINGLE,
MULTI
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package zingg.common.client.data;

import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;

import java.util.List;

public class LinkInputData<D, R, C> extends AMultiInputData<D, R, C> {

public LinkInputData(List<ZFrame<D, R, C>> inputs) throws ZinggClientException {
super(inputs);
}

@Override
protected void setInputs(List<ZFrame<D, R, C>> inputs) throws ZinggClientException {
setLinkerInputs(inputs);
}

private void setLinkerInputs(List<ZFrame<D, R, C>> inputs) throws ZinggClientException {
//TODO what if data contains > 2 pipes
if (inputs.size() >= 2){
setPrimaryInput(inputs.get(0));
setSecondaryInput(inputs.get(1));
} else {
throw new ZinggClientException("Excepted at-least two inputs for linker");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package zingg.common.client.data;

import zingg.common.client.ZFrame;

public class MatchInputData<D, R, C> extends GenericData<D, R, C> {

public MatchInputData(ZFrame<D, R, C> data) {
super(data);
}
}
29 changes: 18 additions & 11 deletions common/client/src/main/java/zingg/common/client/util/DSUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import zingg.common.client.MatchTypes;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.BlockedData;
import zingg.common.client.pipe.Pipe;

import java.util.ArrayList;
Expand Down Expand Up @@ -50,7 +51,14 @@ public ZFrame<D, R, C> getPrefixedColumnsDS(ZFrame<D, R, C> lines) {
}
}


public ZFrame<D, R, C> getBlockedDF(BlockedData<D, R, C>[] blockedData) {
ZFrame<D, R, C> blockedDataDF = blockedData[0].getData();
for (int idx = 1; idx < blockedData.length; idx++){
blockedDataDF = blockedDataDF.union(blockedData[idx].getData());
}

return blockedDataDF;
}

public ZFrame<D, R, C> join(ZFrame<D, R, C> lines, ZFrame<D, R, C> lines1, String joinColumn, boolean filter) {
ZFrame<D, R, C> pairs = lines.join(lines1, joinColumn);
Expand Down Expand Up @@ -111,18 +119,17 @@ public ZFrame<D, R, C> joinWithItself(ZFrame<D, R, C> lines, String joinColumn,
ZFrame<D, R, C> lines1 = getPrefixedColumnsDS(lines);
return join(lines, lines1, joinColumn, filter);
}

public ZFrame<D, R, C> joinWithItselfSourceSensitive(ZFrame<D, R, C> lines, String joinColumn, IArguments args) throws Exception {

ZFrame<D, R, C> lines1 = getPrefixedColumnsDS(lines);

String[] sourceNames = args.getPipeNames();
lines = lines.filter(lines.equalTo(ColName.SOURCE_COL, sourceNames[0]));
lines1 = lines1.filter(lines1.notEqual(ColName.COL_PREFIX + ColName.SOURCE_COL, sourceNames[0]));
return join(lines, lines1, joinColumn, false);

public ZFrame<D, R, C> joinWithItselfSourceSensitive(BlockedData<D,R,C> blockedInputOne, BlockedData<D,R,C> blockedInputTwo, String joinColumn, IArguments args) throws Exception {

ZFrame<D, R, C> blockedOne = blockedInputOne.getData();
ZFrame<D, R, C> blockedTwo = blockedInputTwo.getData();
blockedTwo = getPrefixedColumnsDS(blockedTwo);

return join(blockedOne, blockedTwo, joinColumn, false);
}



public ZFrame<D, R, C> alignDupes(ZFrame<D, R, C> dupesActual, IArguments args) {
dupesActual = dupesActual.cache();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package zingg.common.core.block;

import zingg.common.client.IArguments;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.BlockedData;
import zingg.common.client.data.IData;
import zingg.common.client.data.InputType;
import zingg.common.client.data.AMultiInputData;
import zingg.common.client.util.IModelHelper;
import zingg.common.core.util.BlockingTreeUtil;

public class BlockProvider<S, D, R, C, T> implements IBlockProvider<S, D, R, C, T> {

private final IBlocker<S,D,R,C,T> blocker;

public BlockProvider(IBlocker<S,D,R,C,T> blocker) {
this.blocker = blocker;
}

@Override
public BlockedData<D, R, C>[] getBlockedData(IData<D, R, C> testData, IArguments args, IModelHelper<D, R, C> modelHelper,
BlockingTreeUtil<S, D, R, C, T> blockingTreeUtil) throws ZinggClientException, Exception {

if (InputType.SINGLE.equals(testData.getInputType())) {
return new BlockedData[]{blocker.getBlocked(testData.getData(), args, modelHelper, blockingTreeUtil)};
} else {
BlockedData<D, R, C> blockedDataOne = blocker.getBlocked(((AMultiInputData<D, R, C>)testData).getPrimaryInput(), args, modelHelper, blockingTreeUtil);
BlockedData<D, R, C> blockedDataTwo = blocker.getBlocked(((AMultiInputData<D, R, C>)testData).getSecondaryInput(), args, modelHelper, blockingTreeUtil);
return new BlockedData[]{blockedDataOne, blockedDataTwo};
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.BlockedData;
import zingg.common.client.util.ColName;
import zingg.common.client.util.IModelHelper;
import zingg.common.core.util.BlockingTreeUtil;
Expand All @@ -31,12 +32,12 @@ public ZFrame<D,R,C> getBlocked(ZFrame<D,R,C> testData, IArguments args, IModelH
}

@Override
public ZFrame<D,R,C> getBlocked(ZFrame<D,R,C> testData, IArguments args, IModelHelper<D,R,C> imh, BlockingTreeUtil<S,D,R,C,T> bTreeUtil) throws Exception, ZinggClientException{
public BlockedData<D,R,C> getBlocked(ZFrame<D,R,C> testData, IArguments args, IModelHelper<D,R,C> imh, BlockingTreeUtil<S,D,R,C,T> bTreeUtil) throws Exception, ZinggClientException{
LOG.warn("Blocking model location is " + imh.getBlockingTreePipe(args));
Tree<Canopy<R>> tree = bTreeUtil.readBlockingTree(args, imh);
ZFrame<D,R,C> blocked = bTreeUtil.getBlockHashes(testData, tree);
ZFrame<D,R,C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache();
return blocked1;
return new BlockedData<D, R, C>(blocked1);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package zingg.common.core.block;

import zingg.common.client.IArguments;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.BlockedData;
import zingg.common.client.data.IData;
import zingg.common.client.util.IModelHelper;
import zingg.common.core.util.BlockingTreeUtil;

public interface IBlockProvider<S, D, R, C, T> {
BlockedData<D, R, C>[] getBlockedData(IData<D, R, C> testData, IArguments args, IModelHelper<D, R, C> modelHelper, BlockingTreeUtil<S, D,R,C,T> blockingTreeUtil) throws ZinggClientException, Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.BlockedData;
import zingg.common.client.util.IModelHelper;
import zingg.common.core.util.BlockingTreeUtil;

public interface IBlocker<S,D,R,C,T> {

public ZFrame<D,R,C> getBlocked(ZFrame<D,R,C> testData, IArguments args, IModelHelper<D,R,C> imh, BlockingTreeUtil<S,D,R,C,T> bTreeUtil) throws Exception, ZinggClientException;
BlockedData<D, R, C> getBlocked(ZFrame<D,R,C> testData, IArguments args, IModelHelper<D,R,C> imh, BlockingTreeUtil<S,D,R,C,T> bTreeUtil) throws Exception, ZinggClientException;


}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.GenericData;
import zingg.common.client.data.IData;
import zingg.common.client.util.PipeUtilBase;
import zingg.common.core.match.data.IDataGetter;

Expand All @@ -26,8 +28,9 @@ public ZFrame<D,R,C> getTestData(IArguments args) throws ZinggClientException{
}

@Override
public ZFrame<D, R, C> getData(IArguments args, PipeUtilBase<S, D, R, C> p) throws ZinggClientException {
return p.read(true, true, args.getNumPartitions(), true, args.getData());
public IData<D, R, C> getData(IArguments args, PipeUtilBase<S, D, R, C> p) throws ZinggClientException {
ZFrame<D, R, C> inputDF = p.read(true, true, args.getNumPartitions(), true, args.getData());
return new GenericData<D, R, C>(inputDF);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package zingg.common.core.executor;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.LinkInputData;
import zingg.common.client.util.PipeUtilBase;
import zingg.common.core.match.data.IDataGetter;

import java.util.Arrays;

public class LinkDataGetter<S, D, R, C> implements IDataGetter<S, D, R, C> {

@Override
public LinkInputData<D, R, C> getData(IArguments arg, PipeUtilBase<S, D, R, C> p) throws ZinggClientException {
ZFrame<D, R, C> sourceOneInput = p.read(true, true, arg.getNumPartitions(), true, arg.getData()[0]);
ZFrame<D, R, C> sourceTwoInput = p.read(true, true, arg.getNumPartitions(), true, arg.getData()[1]);
return new LinkInputData<D, R, C>(Arrays.asList(sourceOneInput, sourceTwoInput));
}
}
34 changes: 32 additions & 2 deletions common/core/src/main/java/zingg/common/core/executor/Linker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@

import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.data.BlockedData;
import zingg.common.client.data.IData;
import zingg.common.client.data.LinkInputData;
import zingg.common.client.options.ZinggOptions;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.match.data.IDataGetter;
import zingg.common.core.match.output.IMatchOutputBuilder;
import zingg.common.core.match.output.LinkOutputBuilder;
import zingg.common.core.pairs.IPairBuilder;
import zingg.common.core.pairs.SelfPairBuilderSourceSensitive;

import java.util.Arrays;


public abstract class Linker<S,D,R,C,T> extends Matcher<S,D,R,C,T> {
Expand All @@ -30,7 +35,7 @@ public ZFrame<D,R,C> selectColsFromBlocked(ZFrame<D,R,C> blocked) {
}

@Override
protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> testData) throws Exception, ZinggClientException{
protected ZFrame<D,R,C> getActualDupes(BlockedData<D,R,C>[] blocked, IData<D,R,C> testData) throws Exception, ZinggClientException{
PredictionFilter<D, R, C> predictionFilter = new PredictionFilter<D, R, C>();
return getActualDupes(blocked, testData,predictionFilter, getIPairBuilder(), null);
}
Expand All @@ -56,5 +61,30 @@ public IPairBuilder<S, D, R, C> getIPairBuilder(){
}
return iPairBuilder;
}


@Override
public IDataGetter<S, D, R, C> getDataGetter(){
if (dataGetter == null){
this.dataGetter = new LinkDataGetter<S, D, R, C>();
}
return dataGetter;
}

@Override
protected IData<D, R, C> getPreprocessedInputData(IData<D, R, C> inputData) throws ZinggClientException {
ZFrame<D, R, C> primaryInput = ((LinkInputData<D, R, C>)inputData).getPrimaryInput();
ZFrame<D, R, C> secondaryInput = ((LinkInputData<D, R, C>)inputData).getSecondaryInput();
primaryInput = preprocess(primaryInput);
secondaryInput = preprocess(secondaryInput);
return new LinkInputData<D, R, C>(Arrays.asList(primaryInput, secondaryInput));
}

@Override
protected IData<D, R, C> getFieldDefColumnsDF(IData<D, R, C> inputData) throws ZinggClientException {
ZFrame<D, R, C> primaryInput = ((LinkInputData<D, R, C>)inputData).getPrimaryInput();
ZFrame<D, R, C> secondaryInput = ((LinkInputData<D, R, C>)inputData).getSecondaryInput();
primaryInput = getFieldDefColumnsDS(primaryInput);
secondaryInput = getFieldDefColumnsDS(secondaryInput);
return new LinkInputData<D, R, C>(Arrays.asList(primaryInput, secondaryInput));
}
}
Loading
Loading