Skip to content

Commit

Permalink
Almost working
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Sep 23, 2022
1 parent d67c57d commit 9a433b6
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 266 deletions.
2 changes: 1 addition & 1 deletion contrib/storage-druid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<artifactId>drill-druid-storage</artifactId>
<name>Drill : Contrib : Storage : Druid</name>
<properties>
<druid.TestSuite>**/DruidTestSuit.class</druid.TestSuite>
<druid.TestSuite>**/DruidTestSuite.class</druid.TestSuite>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
Expand All @@ -47,43 +46,30 @@

public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class);

private static final ObjectMapper objectMapper = new ObjectMapper();

private final DruidStoragePlugin plugin;
private final DruidSubScan.DruidSubScanSpec scanSpec;
private final List<String> columns;
private final DruidFilter filter;
private final DruidQueryClient druidQueryClient;
private final FragmentContext fragmentContext;

private final DruidSubScan subScan;

private final TupleMetadata schema;
private BigInteger nextOffset = BigInteger.ZERO;
private int maxRecordsToRead = -1;

private JsonLoaderBuilder jsonBuilder;

private JsonLoader jsonLoader;
private ResultSetLoader resultSetLoader;

private CustomErrorContext errorContext;


public DruidBatchRecordReader(DruidSubScan subScan,
DruidSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
int maxRecordsToRead,
FragmentContext context,
DruidStoragePlugin plugin) {
this.subScan = subScan;
columns = new ArrayList<>();
this.columns = new ArrayList<>();
this.maxRecordsToRead = maxRecordsToRead;
this.plugin = plugin;
scanSpec = subScanSpec;
this.schema = subScan.getSchema();
fragmentContext = context;
this.scanSpec = subScanSpec;
this.filter = subScanSpec.getFilter();
this.druidQueryClient = plugin.getDruidQueryClient();
}
Expand All @@ -96,10 +82,9 @@ public boolean open(SchemaNegotiator negotiator) {

jsonBuilder = new JsonLoaderBuilder()
.resultSetLoader(resultSetLoader)
.standardOptions(negotiator.queryOptions())
.errorContext(errorContext);



return true;
}

Expand All @@ -112,9 +97,10 @@ public boolean next() {
setNextOffset(druidScanResponse);

for (ObjectNode eventNode : druidScanResponse.getEvents()) {
jsonLoader = jsonBuilder
.fromString(eventNode.asText())
JsonLoader jsonLoader = jsonBuilder
.fromString(eventNode.toString())
.build();

result = jsonLoader.readBatch();
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonIgnore;

import org.apache.calcite.avatica.Meta;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.metastore.MetadataProviderManager;
Expand Down Expand Up @@ -58,9 +57,7 @@ public class DruidGroupScan extends AbstractGroupScan {
private static final long DEFAULT_TABLET_SIZE = 1000;
private final DruidScanSpec scanSpec;
private final DruidStoragePlugin storagePlugin;

private MetadataProviderManager metadataProviderManager;

private final MetadataProviderManager metadataProviderManager;
private List<SchemaPath> columns;
private boolean filterPushedDown = false;
private int maxRecordsToRead;
Expand Down Expand Up @@ -293,6 +290,11 @@ public int getMaxRecordsToRead() {
return maxRecordsToRead;
}

@JsonIgnore
public MetadataProviderManager getMetadataProviderManager() {
return metadataProviderManager;
}

public TupleMetadata getSchema() {
if (metadataProviderManager == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void onMatch(RelOptRuleCall relOptRuleCall) {
groupScan.getStoragePlugin(),
newScanSpec,
groupScan.getColumns(),
groupScan.getMaxRecordsToRead());
groupScan.getMaxRecordsToRead(),
groupScan.getMetadataProviderManager());
newGroupsScan.setFilterPushedDown(true);

ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType());
Expand Down

This file was deleted.

Loading

0 comments on commit 9a433b6

Please sign in to comment.