diff --git a/README.md b/README.md index 8c23a98a1..b361f0525 100644 --- a/README.md +++ b/README.md @@ -10,22 +10,23 @@ Python users may also be interested in PyDeequ, a Python interface for Deequ. Yo ## Requirements and Installation -__Deequ__ depends on Java 8. We provide releases compatible with Apache Spark versions 2.2.x to 3.0.x. The Spark 2.2.x and 2.3.x releases depend on Scala 2.11 and the Spark 2.4.x and 3.0.x releases depend on Scala 2.12. +__Deequ__ depends on Java 8. Deequ version 2.x only runs with Spark 3.1, and vice versa. If you rely on a previous Spark version, please use a Deequ 1.x version (legacy version is maintained in legacy-spark-3.0 branch). We provide legacy releases compatible with Apache Spark versions 2.2.x to 3.0.x. The Spark 2.2.x and 2.3.x releases depend on Scala 2.11 and the Spark 2.4.x, 3.0.x, and 3.1.x releases depend on Scala 2.12. Available via [maven central](http://mvnrepository.com/artifact/com.amazon.deequ/deequ). -Choose the latest release that matches your Spark version from the [available versions](https://repo1.maven.org/maven2/com/amazon/deequ/deequ/). Add the release as a dependency to your project. For example for spark 3.0.x: +Choose the latest release that matches your Spark version from the [available versions](https://repo1.maven.org/maven2/com/amazon/deequ/deequ/). Add the release as a dependency to your project. For example, for Spark 3.1.x: + __Maven__ ``` com.amazon.deequ deequ - 1.2.2-spark-3.0 + 2.0.0-spark-3.1 ``` __sbt__ ``` -libraryDependencies += "com.amazon.deequ" % "deequ" % "1.2.2-spark-3.0" +libraryDependencies += "com.amazon.deequ" % "deequ" % "2.0.0-spark-3.1" ``` ## Example diff --git a/pom.xml b/pom.xml index 127f24cd6..4a0df98d4 100644 --- a/pom.xml +++ b/pom.xml @@ -13,11 +13,10 @@ 1.8 UTF-8 - 2.12 - ${scala.major.version}.10 + 2.13 + ${scala.major.version}.0 ${scala.major.version} - 4.4.0 - + 4.5.6 3.2.0 @@ -89,7 +88,7 @@ org.scalanlp breeze_${scala.major.version} - 0.13.2 + 2.0.1-RC1 @@ -147,6 +146,7 @@ src/main/scala + src/test/scala @@ -235,9 +235,9 @@ org.apache.maven.plugins maven-jar-plugin 3.2.0 - - ${project.artifactId}_${scala.major.version}-${project.version} - + + ${project.artifactId}_${scala.major.version}-${project.version} + diff --git a/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala b/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala index d8158f107..302fee94d 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala @@ -60,6 +60,23 @@ case class PatternMatch(column: String, pattern: Regex, where: Option[String] = override protected def additionalPreconditions(): Seq[StructType => Unit] = { hasColumn(column) :: isString(column) :: Nil } + + // PatternMatch hasCode is different with the same-parameter objects + // because Regex compares by address + // fix this by tuple with pattern string + private val internalObj = (column, pattern.toString(), where) + + override def hashCode(): Int = { + internalObj.hashCode() + } + + override def equals(obj: Any): Boolean = { + obj match { + case o: PatternMatch => internalObj.equals(o.asInstanceOf[PatternMatch].internalObj) + case _ => false + } + } + } object Patterns { diff --git a/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala b/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala index a39b2d672..8652aec66 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/QuantileNonSample.scala @@ -21,7 +21,6 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import scala.util.control.Breaks._ - class QuantileNonSample[T]( var sketchSize: Int, var shrinkingFactor: Double = 0.64) @@ -52,7 +51,7 @@ class QuantileNonSample[T]( this.shrinkingFactor = shrinkingFactor compactors = ArrayBuffer.fill(data.length)(new NonSampleCompactor[T]) for (i <- data.indices) { - compactors(i).buffer = data(i).to[ArrayBuffer] + compactors(i).buffer = data(i).to(ArrayBuffer[T]) } curNumOfCompactors = data.length compactorActualSize = getCompactorItemsCount diff --git a/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala b/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala index 65edb9424..b34c58fca 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.aggregate.{ApproximatePercentile, DeequHyperLogLogPlusPlusUtils} import org.apache.spark.sql.SaveMode -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import scala.util.hashing.MurmurHash3 private object StateInformation { @@ -58,7 +58,7 @@ case class InMemoryStateProvider() extends StateLoader with StatePersister { override def toString: String = { val buffer = new StringBuilder() - statesByAnalyzer.foreach { case (analyzer, state) => + statesByAnalyzer.asScala.foreach { case (analyzer, state) => buffer.append(analyzer.toString) buffer.append(" => ") buffer.append(state.toString) diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala b/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala index 8bf8b634c..c5ae4d062 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/OnlineNormalStrategy.scala @@ -116,7 +116,7 @@ case class OnlineNormalStrategy( ret += OnlineCalculationResult(currentMean, stdDev, isAnomaly = true) } } - ret + ret.toSeq } diff --git a/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala b/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala index 0ee0ac25f..dc19ab275 100644 --- a/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala +++ b/src/main/scala/com/amazon/deequ/anomalydetection/seasonal/HoltWinters.scala @@ -132,7 +132,7 @@ class HoltWinters( } val forecasted = Y.drop(series.size) - ModelResults(forecasted, level, trend, seasonality, residuals) + ModelResults(forecasted.toSeq, level.toSeq, trend.toSeq, seasonality.toSeq, residuals.toSeq) } private def modelSelectionFor( diff --git a/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala b/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala index 43e23fadc..1df5dbafb 100644 --- a/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala +++ b/src/main/scala/com/amazon/deequ/profiles/ColumnProfiler.scala @@ -317,7 +317,7 @@ object ColumnProfiler { Histogram(histogram.column).equals(histogram) case _ => false } - analyzerContextExistingValues = AnalyzerContext(relevantEntries) + analyzerContextExistingValues = AnalyzerContext(relevantEntries.toMap) } } } diff --git a/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala b/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala index 3f5762bab..5d48c57d7 100644 --- a/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala +++ b/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala @@ -19,7 +19,6 @@ package com.amazon.deequ.repository import java.lang.reflect.Type import com.amazon.deequ.analyzers.{State, _} -import org.apache.spark.sql.functions._ import com.amazon.deequ.metrics.{Distribution, Metric, _} import util.{Failure, Success, Try} @@ -28,13 +27,11 @@ import com.google.gson._ import com.google.gson.reflect.TypeToken import scala.collection._ -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} import JsonSerializationConstants._ -import scala.collection.JavaConversions._ - private[repository] object JsonSerializationConstants { val STRING_MAP_TYPE: Type = new TypeToken[JList[JMap[String, Any]]]() {}.getType @@ -413,22 +410,22 @@ private[deequ] object AnalyzerDeserializer getOptionalWhereParam(json)) case "CountDistinct" => - CountDistinct(getColumnsAsSeq(context, json)) + CountDistinct(getColumnsAsSeq(context, json).toSeq) case "Distinctness" => - Distinctness(getColumnsAsSeq(context, json)) + Distinctness(getColumnsAsSeq(context, json).toSeq) case "Entropy" => Entropy(json.get(COLUMN_FIELD).getAsString) case "MutualInformation" => - MutualInformation(getColumnsAsSeq(context, json)) + MutualInformation(getColumnsAsSeq(context, json).toSeq) case "UniqueValueRatio" => - UniqueValueRatio(getColumnsAsSeq(context, json)) + UniqueValueRatio(getColumnsAsSeq(context, json).toSeq) case "Uniqueness" => - Uniqueness(getColumnsAsSeq(context, json)) + Uniqueness(getColumnsAsSeq(context, json).toSeq) case "Histogram" => Histogram( @@ -571,7 +568,7 @@ private[deequ] object MetricDeserializer extends JsonDeserializer[Metric[_]] { val instance = jsonObject.get("instance").getAsString if (jsonObject.has("value")) { val entries = jsonObject.get("value").getAsJsonObject - val values = entries.entrySet().map { entry => + val values = entries.entrySet().asScala.map { entry => entry.getKey -> entry.getValue.getAsDouble } .toMap diff --git a/src/main/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoader.scala b/src/main/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoader.scala index 3b9abc5b3..8351fe0b1 100644 --- a/src/main/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoader.scala +++ b/src/main/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoader.scala @@ -99,8 +99,8 @@ private[repository] object MetricsRepositoryMultipleResultsLoader { def jsonUnion(jsonOne: String, jsonTwo: String): String = { - val objectOne: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonOne) - val objectTwo: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonTwo) + val objectOne: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonOne).toSeq + val objectTwo: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonTwo).toSeq val columnsTotal = objectOne.headOption.getOrElse(Map.empty).keySet ++ objectTwo.headOption.getOrElse(Map.empty).keySet diff --git a/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala index a45626b31..70d78993a 100644 --- a/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala +++ b/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala @@ -147,10 +147,10 @@ class FileSystemMetricsRepositoryMultipleResultsLoader( .metricMap .filterKeys(analyzer => forAnalyzers.isEmpty || forAnalyzers.get.contains(analyzer)) - val requestedAnalyzerContext = AnalyzerContext(requestedMetrics) + val requestedAnalyzerContext = AnalyzerContext(requestedMetrics.toMap) AnalysisResult(analysisResult.resultKey, requestedAnalyzerContext) - } + }.toSeq } } diff --git a/src/main/scala/com/amazon/deequ/repository/memory/InMemoryMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/memory/InMemoryMetricsRepository.scala index 61ad1e1ee..26f9ccd60 100644 --- a/src/main/scala/com/amazon/deequ/repository/memory/InMemoryMetricsRepository.scala +++ b/src/main/scala/com/amazon/deequ/repository/memory/InMemoryMetricsRepository.scala @@ -21,7 +21,7 @@ import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository._ import com.amazon.deequ.analyzers.runners.AnalyzerContext -import scala.collection.JavaConversions._ +import scala.jdk.CollectionConverters._ import java.util.concurrent.ConcurrentHashMap /** A simple Repository implementation backed by a concurrent hash map */ @@ -117,7 +117,7 @@ class LimitedInMemoryMetricsRepositoryMultipleResultsLoader( /** Get the AnalysisResult */ def get(): Seq[AnalysisResult] = { - resultsRepository + resultsRepository.asScala .filterKeys(key => after.isEmpty || after.get <= key.dataSetDate) .filterKeys(key => before.isEmpty || key.dataSetDate <= before.get) .filterKeys(key => tagValues.isEmpty || tagValues.get.toSet.subsetOf(key.tags.toSet)) @@ -129,7 +129,7 @@ class LimitedInMemoryMetricsRepositoryMultipleResultsLoader( .metricMap .filterKeys(analyzer => forAnalyzers.isEmpty || forAnalyzers.get.contains(analyzer)) - AnalysisResult(analysisResult.resultKey, AnalyzerContext(requestedMetrics)) + AnalysisResult(analysisResult.resultKey, AnalyzerContext(requestedMetrics.toMap)) } .toSeq } diff --git a/src/main/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunner.scala b/src/main/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunner.scala index b969e1ea9..023f5954b 100644 --- a/src/main/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunner.scala +++ b/src/main/scala/com/amazon/deequ/suggestions/ConstraintSuggestionRunner.scala @@ -132,7 +132,7 @@ class ConstraintSuggestionRunner { groupedSuggestionsWithColumnNames.map { case (_, suggestion) => suggestion } } ConstraintSuggestionResult(columnProfiles.profiles, columnProfiles.numRecords, - columnsWithSuggestions, verificationResult) + columnsWithSuggestions.toMap, verificationResult) } private[this] def splitTrainTestSets( diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala index 6580fefc1..999447209 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala @@ -679,6 +679,13 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with "Pattern compliance analyzer" should { val someColumnName = "some" + "PatternMatch hashCode should equal for the same pattern" in { + val p1 = PatternMatch("col1", "[a-z]".r) + val p2 = PatternMatch("col1", "[a-z]".r) + p1.hashCode() should equal(p2.hashCode()) + p1 should equal(p2) + } + "not match doubles in nullable column" in withSparkSession { sparkSession => val df = dataFrameWithColumn(someColumnName, DoubleType, sparkSession, Row(1.1),