Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Sep 23, 2022
1 parent e46c579 commit d67c57d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
private final DruidQueryClient druidQueryClient;
private final FragmentContext fragmentContext;

private final DruidSubScan subScan;

private final TupleMetadata schema;
private BigInteger nextOffset = BigInteger.ZERO;
private int maxRecordsToRead = -1;
Expand All @@ -69,17 +71,18 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
private CustomErrorContext errorContext;


public DruidBatchRecordReader(DruidSubScanSpec subScanSpec,
public DruidBatchRecordReader(DruidSubScan subScan,
DruidSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
int maxRecordsToRead,
FragmentContext context,
DruidStoragePlugin plugin) {
this.subScan = subScan;
columns = new ArrayList<>();
setColumns(projectedColumns);
this.maxRecordsToRead = maxRecordsToRead;
this.plugin = plugin;
scanSpec = subScanSpec;
this.schema = scanSpec.getSchema();
this.schema = subScan.getSchema();
fragmentContext = context;
this.filter = subScanSpec.getFilter();
this.druidQueryClient = plugin.getDruidQueryClient();
Expand All @@ -89,8 +92,8 @@ public DruidBatchRecordReader(DruidSubScanSpec subScanSpec,
public boolean open(SchemaNegotiator negotiator) {
resultSetLoader = negotiator.build();
errorContext = negotiator.parentErrorContext();
negotiator.setErrorContext(errorContext);

negotiator
jsonBuilder = new JsonLoaderBuilder()
.resultSetLoader(resultSetLoader)
.errorContext(errorContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
Expand All @@ -41,12 +40,33 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;

public class DruidScanBatchCreator implements BatchCreator<DruidSubScan> {

private static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class);

/*
@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
SplunkSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
try {
ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
return builder.buildScanOperator(context, subScan);
} catch (UserException e) {
// Rethrow user exceptions directly
throw e;
} catch (Throwable e) {
// Wrap all others
throw new ExecutionSetupException(e);
}
}
*/


@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
Expand All @@ -55,6 +75,7 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc

for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) {
try {
ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan, context);
columns = subScan.getColumns();
readers.add(new DruidRecordReader(scanSpec, columns, subScan.getMaxRecordsToRead(), context, subScan.getStorageEngine()));
} catch (Exception ex) {
Expand All @@ -67,7 +88,7 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubSc

private ScanFrameworkBuilder createBuilder(OptionManager options,
DruidSubScan subScan,
DruidSubScanSpec scanSpec) {
ExecutorFragmentContext context) {
ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
builder.projection(subScan.getColumns());
builder.providedSchema(subScan.getSchema());
Expand All @@ -76,26 +97,34 @@ private ScanFrameworkBuilder createBuilder(OptionManager options,
builder.errorContext(new ChildErrorContext(builder.errorContext()) {});

// Reader
ReaderFactory readerFactory = new DruidReaderFactory(subScan);
ReaderFactory readerFactory = new DruidReaderFactory(subScan, context);
builder.setReaderFactory(readerFactory);
builder.nullType(Types.optional(MinorType.VARCHAR));

return builder;
}

private static class DruidReaderFactory() implements ReaderFactory {

private static class DruidReaderFactory implements ReaderFactory {
private final DruidSubScan subScan;
public DruidReaderFactory(DruidSubScan subScan) {
private final ExecutorFragmentContext context;
private final Iterator<DruidSubScanSpec> subScanSpecIterator;
public DruidReaderFactory(DruidSubScan subScan, ExecutorFragmentContext context) {
this.subScan = subScan;
this.context = context;
this.subScanSpecIterator = subScan.getScanSpec().iterator();
}

@Override
public void bind(ManagedScanFramework framework) { }

@Override
public ManagedReader<SchemaNegotiator> next() {
return new DruidBatchRecordReader();
return new DruidBatchRecordReader(subScan,
subScanSpecIterator.next(),
subScan.getColumns(),
subScan.getMaxRecordsToRead(),
context,
subScan.getStorageEngine());
}
}
}

0 comments on commit d67c57d

Please sign in to comment.