Skip to content

Commit

Permalink
Build against Druid 0.9.1. (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm authored and fjy committed Jun 29, 2016
1 parent 69c37ac commit 49bd6df
Show file tree
Hide file tree
Showing 19 changed files with 89 additions and 79 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ val jacksonOneVersion = "1.9.13"
// See https://github.com/druid-io/druid/pull/1669, https://github.com/druid-io/tranquility/pull/81 before upgrading Jackson
val jacksonTwoVersion = "2.4.6"
val jacksonTwoModuleScalaVersion = "2.4.5"
val druidVersion = "0.9.0"
val druidVersion = "0.9.1"
val guiceVersion = "4.0"
val flinkVersion = "1.0.3"
val finagleVersion = "6.31.0"
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ import com.metamx.tranquility.typeclass.Timestamper
import com.twitter.finagle.Service
import io.druid.data.input.ByteBufferInputRowParser
import io.druid.data.input.InputRow
import io.druid.data.input.impl.DimensionSchema
import io.druid.data.input.impl.DimensionSchema.ValueType
import io.druid.data.input.impl.InputRowParser
import io.druid.data.input.impl.MapInputRowParser
import io.druid.data.input.impl.StringInputRowParser
Expand Down Expand Up @@ -90,7 +92,7 @@ import scala.reflect.runtime.universe.typeTag
* .curator(curator)
* .discoveryPath("/test/discovery")
* .location(DruidLocation(new DruidEnvironment("druid:local:indexer", "druid:local:firehose:%s"), dataSource))
* .rollup(DruidRollup(dimensions, aggregators, QueryGranularity.MINUTE))
* .rollup(DruidRollup(dimensions, aggregators, QueryGranularities.MINUTE))
* .tuning(new ClusteredBeamTuning(Granularity.HOUR, 10.minutes, 1, 1))
* .buildTranquilizer()
* val future = sender.send(Map("timestamp" -> "2010-01-02T03:04:05.678Z", "bar" -> "hey", "baz" -> 3))
Expand Down Expand Up @@ -320,7 +322,16 @@ object DruidBeams
)
case _ =>
SpecificDruidDimensions(
j2s(parseSpec.getDimensionsSpec.getDimensions),
j2s(parseSpec.getDimensionsSpec.getDimensions) filter { dimensionSchema =>
// Spatial dimensions are handled as a special case, above
dimensionSchema.getTypeName != DimensionSchema.SPATIAL_TYPE_NAME
} map { dimensionSchema =>
dimensionSchema.getValueType match {
case ValueType.STRING => dimensionSchema.getName
case other =>
throw new IllegalStateException("Dimensions of type[%s] are not supported" format other)
}
},
spatialDimensions
)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.metamx.tranquility.druid.SchemalessDruidDimensions;
import com.metamx.tranquility.druid.SpecificDruidDimensions;
import com.metamx.tranquility.finagle.FinagleRegistryConfig;
import io.druid.granularity.QueryGranularity;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.joda.time.Period;
Expand All @@ -52,7 +52,7 @@ public void testSpecificDimensionsRollupConfiguration() throws Exception
final DruidRollup rollup = DruidRollup.create(
DruidDimensions.specific(dimensions),
aggregators,
QueryGranularity.MINUTE
QueryGranularities.MINUTE
);
Assert.assertTrue(rollup.dimensions() instanceof SpecificDruidDimensions);
Assert.assertEquals("column", ((SpecificDruidDimensions) rollup.dimensions()).dimensions().iterator().next());
Expand All @@ -64,7 +64,7 @@ public void testSchemalessDimensionsRollupConfiguration() throws Exception
final DruidRollup rollup = DruidRollup.create(
DruidDimensions.schemaless(),
aggregators,
QueryGranularity.MINUTE
QueryGranularities.MINUTE
);
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
Assert.assertEquals(0, ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().size());
Expand All @@ -76,7 +76,7 @@ public void testSchemalessDimensionsWithExclusionsRollupConfiguration() throws E
final DruidRollup rollup = DruidRollup.create(
DruidDimensions.schemalessWithExclusions(dimensions),
aggregators,
QueryGranularity.MINUTE
QueryGranularities.MINUTE
);
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().iterator().next());
Expand All @@ -96,7 +96,7 @@ public void testSchemalessDimensionsWithExclusionsAndSpatialDimensionsRollupConf
)
),
aggregators,
QueryGranularity.MINUTE
QueryGranularities.MINUTE
);
Assert.assertTrue(rollup.dimensions() instanceof SchemalessDruidDimensions);
Assert.assertEquals("column", ((SchemalessDruidDimensions) rollup.dimensions()).dimensionExclusions().iterator().next());
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/direct-druid-test-smile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dataSources:
type: string
parseSpec:
format: csv
columns: [ts, foo, bar, lat, lon]
columns: [ts, foo, bar, lat, lon, coord.geo]
timestampSpec:
column: ts
format: posix
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/direct-druid-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ dataSources:
type: string
parseSpec:
format: csv
columns: [ts, foo, bar, lat, lon]
columns: [ts, foo, bar, lat, lon, coord.geo]
timestampSpec:
column: ts
format: posix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,14 @@

package com.metamx.tranquility.druid.input

import com.metamx.common.scala.Jackson
import com.metamx.common.scala.untyped.Dict
import com.metamx.tranquility.druid.DruidRollup
import com.metamx.tranquility.druid.SpecificDruidDimensions
import com.metamx.tranquility.partition.MapPartitioner
import com.metamx.tranquility.typeclass.Timestamper
import io.druid.data.input.MapBasedInputRow
import io.druid.data.input.impl.DimensionsSpec
import io.druid.data.input.impl.MapInputRowParser
import io.druid.data.input.impl.TimeAndDimsParseSpec
import io.druid.data.input.impl.TimestampSpec
import io.druid.granularity.QueryGranularity
import io.druid.query.aggregation.DoubleSumAggregatorFactory
import io.druid.granularity.QueryGranularities
import org.joda.time.DateTime
import org.scala_tools.time.Imports._
import org.scalatest.FunSuite
import org.scalatest.ShouldMatchers
import scala.collection.JavaConverters._
Expand All @@ -54,13 +47,11 @@ class InputRowPartitionerTest extends FunSuite with ShouldMatchers
val parser = new MapInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("t", "iso", null),
new DimensionsSpec(Seq("foo", "bar", "baz").asJava, null, null)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Seq("foo", "bar", "baz").asJava), null, null)
)
)

val partitioner = new InputRowPartitioner(
QueryGranularity.MINUTE
)
val partitioner = new InputRowPartitioner(QueryGranularities.MINUTE)

val same = Seq(
Dict("t" -> new DateTime("2000T00:00:03"), "foo" -> 1, "bar" -> Seq("y", "z")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package com.metamx.tranquility.test

import _root_.io.druid.data.input.impl.TimestampSpec
import _root_.io.druid.granularity.QueryGranularity
import _root_.io.druid.query.aggregation.LongSumAggregatorFactory
import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.runtime.universe.typeTag
Expand Down Expand Up @@ -58,6 +57,7 @@ import com.twitter.util.Future
import com.twitter.util.NonFatal
import com.twitter.util.Return
import com.twitter.util.Throw
import _root_.io.druid.granularity.QueryGranularities
import java.io.ByteArrayInputStream
import java.nio.ByteBuffer
import java.{util => ju}
Expand Down Expand Up @@ -96,7 +96,7 @@ object DirectDruidTest
Vector(MultipleFieldDruidSpatialDimension("coord.geo", Seq("lat", "lon")))
),
IndexedSeq(new LongSumAggregatorFactory("barr", "bar")),
QueryGranularity.MINUTE
QueryGranularities.MINUTE
)
val druidEnvironment = new DruidEnvironment(
"druid/tranquility/indexer" /* Slashes should be converted to colons */ ,
Expand Down
28 changes: 14 additions & 14 deletions core/src/test/scala/com/metamx/tranquility/test/DruidBeamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ import com.metamx.tranquility.druid.DruidRollup
import com.metamx.tranquility.druid.DruidSpatialDimension
import com.metamx.tranquility.druid.DruidTuning
import com.metamx.tranquility.druid.SpecificDruidDimensions
import io.druid.data.input.impl.TimestampSpec
import io.druid.granularity.QueryGranularity
import io.druid.indexing.common.task.RealtimeIndexTask
import io.druid.indexing.common.task.Task
import io.druid.query.aggregation.LongSumAggregatorFactory
import io.druid.segment.realtime.firehose.ChatHandlerProvider
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider
import io.druid.server.metrics.EventReceiverFirehoseRegister
import io.druid.timeline.partition.LinearShardSpec
import _root_.io.druid.data.input.impl.TimestampSpec
import _root_.io.druid.granularity.QueryGranularities
import _root_.io.druid.indexing.common.task.RealtimeIndexTask
import _root_.io.druid.indexing.common.task.Task
import _root_.io.druid.query.aggregation.LongSumAggregatorFactory
import _root_.io.druid.segment.realtime.firehose.ChatHandlerProvider
import _root_.io.druid.segment.realtime.firehose.ClippedFirehoseFactory
import _root_.io.druid.segment.realtime.firehose.NoopChatHandlerProvider
import _root_.io.druid.server.metrics.EventReceiverFirehoseRegister
import _root_.io.druid.timeline.partition.LinearShardSpec
import org.joda.time.chrono.ISOChronology
import org.scala_tools.time.Imports._
import org.scalatest.FunSuite
import org.scalatest.Matchers
import scala.collection.JavaConverters._
import _root_.scala.collection.JavaConverters._

class DruidBeamTest extends FunSuite with Matchers
{
Expand Down Expand Up @@ -123,7 +123,7 @@ class DruidBeamTest extends FunSuite with Matchers
DruidRollup(
dimensions = SpecificDruidDimensions(Seq("dim1", "dim2"), Seq(DruidSpatialDimension.singleField("spatial1"))),
aggregators = Seq(new LongSumAggregatorFactory("met1", "met1")),
indexGranularity = QueryGranularity.MINUTE
indexGranularity = QueryGranularities.MINUTE
),
new TimestampSpec("ts", "iso", null),
null,
Expand Down Expand Up @@ -194,12 +194,12 @@ class DruidBeamTest extends FunSuite with Matchers
dataSchema.getDataSource should be("mydatasource")
dataSchema.getAggregators.deep should be(Array(new LongSumAggregatorFactory("met1", "met1")).deep)
dataSchema.getGranularitySpec.getSegmentGranularity should be(Granularity.HOUR)
dataSchema.getGranularitySpec.getQueryGranularity should be(QueryGranularity.MINUTE)
dataSchema.getGranularitySpec.getQueryGranularity should be(QueryGranularities.MINUTE)

val parseSpec = dataSchema.getParser.getParseSpec
parseSpec.getTimestampSpec.getTimestampColumn should be("ts")
parseSpec.getTimestampSpec.getTimestampFormat should be("iso")
parseSpec.getDimensionsSpec.getDimensions.asScala should be(Seq("dim1", "dim2"))
parseSpec.getDimensionsSpec.getDimensions.asScala.map(_.getName) should be(Seq("dim1", "dim2", "spatial1"))
parseSpec.getDimensionsSpec.getSpatialDimensions.asScala.map(_.getDimName) should be(Seq("spatial1"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@

package com.metamx.tranquility.test

import com.metamx.common.parsers.ParseException
import com.metamx.tranquility.druid.DruidRollup
import com.metamx.tranquility.druid.SchemalessDruidDimensions
import com.metamx.tranquility.druid.SpecificDruidDimensions
import io.druid.data.input.impl.TimestampSpec
import io.druid.granularity.QueryGranularity
import io.druid.granularity.QueryGranularities
import io.druid.query.aggregation.CountAggregatorFactory
import io.druid.query.aggregation.LongSumAggregatorFactory
import org.scalatest.FunSuite
Expand All @@ -37,7 +36,7 @@ class DruidRollupTest extends FunSuite with Matchers
val rollup = DruidRollup(
SpecificDruidDimensions(Vector("hey", "what"), Vector.empty),
Seq(new CountAggregatorFactory("heyyo")),
QueryGranularity.NONE
QueryGranularities.NONE
)
rollup.validate()
}
Expand All @@ -47,7 +46,7 @@ class DruidRollupTest extends FunSuite with Matchers
DruidRollup(
SpecificDruidDimensions(Vector("hey", "what"), Vector.empty),
Seq(new CountAggregatorFactory("hey")),
QueryGranularity.NONE
QueryGranularities.NONE
)
}
e.getMessage should be("Duplicate columns: hey")
Expand All @@ -58,7 +57,7 @@ class DruidRollupTest extends FunSuite with Matchers
DruidRollup(
SpecificDruidDimensions(Vector("what"), Vector.empty),
Seq(new CountAggregatorFactory("hey"), new LongSumAggregatorFactory("hey", "blah")),
QueryGranularity.NONE
QueryGranularities.NONE
)
}
e.getMessage should be("Duplicate columns: hey")
Expand All @@ -69,7 +68,7 @@ class DruidRollupTest extends FunSuite with Matchers
DruidRollup(
SpecificDruidDimensions(Vector("what", "what"), Vector.empty),
Seq(new CountAggregatorFactory("hey")),
QueryGranularity.NONE
QueryGranularities.NONE
)
}
e.getMessage should be("Duplicate columns: what")
Expand All @@ -79,7 +78,7 @@ class DruidRollupTest extends FunSuite with Matchers
val rollup = DruidRollup(
SpecificDruidDimensions(Vector("e", "f", "a", "b", "z", "t"), Vector.empty),
Seq(new CountAggregatorFactory("hey")),
QueryGranularity.NONE
QueryGranularities.NONE
)
rollup.dimensions.specMap.get("dimensions").asInstanceOf[java.util.List[String]].asScala should
be(Seq("e", "f", "a", "b", "z", "t"))
Expand All @@ -89,7 +88,7 @@ class DruidRollupTest extends FunSuite with Matchers
val rollup = DruidRollup(
SpecificDruidDimensions(Seq("foo", "bar")),
Seq(new LongSumAggregatorFactory("hey", "there")),
QueryGranularity.NONE
QueryGranularities.NONE
)
val timestampSpec = new TimestampSpec("t", "auto", null)
rollup.isStringDimension(timestampSpec, "t") should be(false)
Expand All @@ -105,7 +104,7 @@ class DruidRollupTest extends FunSuite with Matchers
val rollup = DruidRollup(
SchemalessDruidDimensions(Set("qux")),
Seq(new LongSumAggregatorFactory("hey", "there")),
QueryGranularity.NONE
QueryGranularities.NONE
)
val timestampSpec = new TimestampSpec("t", "auto", null)
rollup.isStringDimension(timestampSpec, "t") should be(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.metamx.tranquility.druid.SpecificDruidDimensions
import com.metamx.tranquility.partition.MapPartitioner
import com.metamx.tranquility.typeclass.Timestamper
import io.druid.data.input.impl.TimestampSpec
import io.druid.granularity.QueryGranularity
import io.druid.granularity.QueryGranularities
import io.druid.query.aggregation.DoubleSumAggregatorFactory
import java.{util => ju}
import org.joda.time.DateTime
Expand Down Expand Up @@ -69,7 +69,7 @@ class MapPartitionerTest extends FunSuite with Matchers
DruidRollup(
SpecificDruidDimensions(Seq("foo", "bar", "baz")),
Seq(new DoubleSumAggregatorFactory("x", "xSum")),
QueryGranularity.MINUTE
QueryGranularities.MINUTE
)
)

Expand Down Expand Up @@ -106,7 +106,7 @@ class MapPartitionerTest extends FunSuite with Matchers
DruidRollup(
SpecificDruidDimensions(Seq("foo", "bar", "baz")),
Seq(new DoubleSumAggregatorFactory("x", "xSum")),
QueryGranularity.MINUTE
QueryGranularities.MINUTE
)
)

Expand Down Expand Up @@ -146,7 +146,7 @@ class MapPartitionerTest extends FunSuite with Matchers
DruidRollup(
SpecificDruidDimensions(Seq("foo", "bar", "baz")),
Seq(new DoubleSumAggregatorFactory("x", "xSum")),
QueryGranularity.MINUTE
QueryGranularities.MINUTE
)
)

Expand Down
Loading

0 comments on commit 49bd6df

Please sign in to comment.