Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tammruka/2.0.0 spark 3.2.0 #403

Open
wants to merge 12 commits into
base: tammruka/2.0.0-spark-3.2.0
Choose a base branch
from
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@ Python users may also be interested in PyDeequ, a Python interface for Deequ. Yo

## Requirements and Installation

__Deequ__ depends on Java 8. We provide releases compatible with Apache Spark versions 2.2.x to 3.0.x. The Spark 2.2.x and 2.3.x releases depend on Scala 2.11 and the Spark 2.4.x and 3.0.x releases depend on Scala 2.12.
__Deequ__ depends on Java 8. Deequ version 2.x only runs with Spark 3.1, and vice versa. If you rely on a previous Spark version, please use a Deequ 1.x version (legacy version is maintained in legacy-spark-3.0 branch). We provide legacy releases compatible with Apache Spark versions 2.2.x to 3.0.x. The Spark 2.2.x and 2.3.x releases depend on Scala 2.11 and the Spark 2.4.x, 3.0.x, and 3.1.x releases depend on Scala 2.12.

Available via [maven central](http://mvnrepository.com/artifact/com.amazon.deequ/deequ).

Choose the latest release that matches your Spark version from the [available versions](https://repo1.maven.org/maven2/com/amazon/deequ/deequ/). Add the release as a dependency to your project. For example for spark 3.0.x:
Choose the latest release that matches your Spark version from the [available versions](https://repo1.maven.org/maven2/com/amazon/deequ/deequ/). Add the release as a dependency to your project. For example, for Spark 3.1.x:

__Maven__
```
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.2.2-spark-3.0</version>
<version>2.0.0-spark-3.1</version>
</dependency>
```
__sbt__
```
libraryDependencies += "com.amazon.deequ" % "deequ" % "1.2.2-spark-3.0"
libraryDependencies += "com.amazon.deequ" % "deequ" % "2.0.0-spark-3.1"
```

## Example
Expand Down
16 changes: 8 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>

<scala.major.version>2.12</scala.major.version>
<scala.version>${scala.major.version}.10</scala.version>
<scala.major.version>2.13</scala.major.version>
<scala.version>${scala.major.version}.0</scala.version>
<artifact.scala.version>${scala.major.version}</artifact.scala.version>
<scala-maven-plugin.version>4.4.0</scala-maven-plugin.version>

<scala-maven-plugin.version>4.5.6</scala-maven-plugin.version>
<spark.version>3.2.0</spark.version>
</properties>

Expand Down Expand Up @@ -89,7 +88,7 @@
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.major.version}</artifactId>
<version>0.13.2</version>
<version>2.0.1-RC1</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -147,6 +146,7 @@

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>

<plugins>
<plugin>
Expand Down Expand Up @@ -235,9 +235,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<finalName>${project.artifactId}_${scala.major.version}-${project.version}</finalName>
</configuration>
<configuration>
<finalName>${project.artifactId}_${scala.major.version}-${project.version}</finalName>
</configuration>
<executions>
<execution>
<goals>
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/com/amazon/deequ/analyzers/PatternMatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ case class PatternMatch(column: String, pattern: Regex, where: Option[String] =
override protected def additionalPreconditions(): Seq[StructType => Unit] = {
hasColumn(column) :: isString(column) :: Nil
}

// PatternMatch hasCode is different with the same-parameter objects
// because Regex compares by address
// fix this by tuple with pattern string
private val internalObj = (column, pattern.toString(), where)

override def hashCode(): Int = {
internalObj.hashCode()
}

override def equals(obj: Any): Boolean = {
obj match {
case o: PatternMatch => internalObj.equals(o.asInstanceOf[PatternMatch].internalObj)
case _ => false
}
}

}

object Patterns {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.control.Breaks._


class QuantileNonSample[T](
var sketchSize: Int,
var shrinkingFactor: Double = 0.64)
Expand Down Expand Up @@ -52,7 +51,7 @@ class QuantileNonSample[T](
this.shrinkingFactor = shrinkingFactor
compactors = ArrayBuffer.fill(data.length)(new NonSampleCompactor[T])
for (i <- data.indices) {
compactors(i).buffer = data(i).to[ArrayBuffer]
compactors(i).buffer = data(i).to(ArrayBuffer[T])
}
curNumOfCompactors = data.length
compactorActualSize = getCompactorItemsCount
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.aggregate.{ApproximatePercentile, DeequHyperLogLogPlusPlusUtils}
import org.apache.spark.sql.SaveMode

import scala.collection.JavaConversions._
import scala.jdk.CollectionConverters._
import scala.util.hashing.MurmurHash3

private object StateInformation {
Expand Down Expand Up @@ -58,7 +58,7 @@ case class InMemoryStateProvider() extends StateLoader with StatePersister {

override def toString: String = {
val buffer = new StringBuilder()
statesByAnalyzer.foreach { case (analyzer, state) =>
statesByAnalyzer.asScala.foreach { case (analyzer, state) =>
buffer.append(analyzer.toString)
buffer.append(" => ")
buffer.append(state.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ case class OnlineNormalStrategy(
ret += OnlineCalculationResult(currentMean, stdDev, isAnomaly = true)
}
}
ret
ret.toSeq
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class HoltWinters(
}
val forecasted = Y.drop(series.size)

ModelResults(forecasted, level, trend, seasonality, residuals)
ModelResults(forecasted.toSeq, level.toSeq, trend.toSeq, seasonality.toSeq, residuals.toSeq)
}

private def modelSelectionFor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ object ColumnProfiler {
Histogram(histogram.column).equals(histogram)
case _ => false
}
analyzerContextExistingValues = AnalyzerContext(relevantEntries)
analyzerContextExistingValues = AnalyzerContext(relevantEntries.toMap)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.amazon.deequ.repository
import java.lang.reflect.Type

import com.amazon.deequ.analyzers.{State, _}
import org.apache.spark.sql.functions._
import com.amazon.deequ.metrics.{Distribution, Metric, _}

import util.{Failure, Success, Try}
Expand All @@ -28,13 +27,11 @@ import com.google.gson._
import com.google.gson.reflect.TypeToken

import scala.collection._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap}

import JsonSerializationConstants._

import scala.collection.JavaConversions._

private[repository] object JsonSerializationConstants {

val STRING_MAP_TYPE: Type = new TypeToken[JList[JMap[String, Any]]]() {}.getType
Expand Down Expand Up @@ -413,22 +410,22 @@ private[deequ] object AnalyzerDeserializer
getOptionalWhereParam(json))

case "CountDistinct" =>
CountDistinct(getColumnsAsSeq(context, json))
CountDistinct(getColumnsAsSeq(context, json).toSeq)

case "Distinctness" =>
Distinctness(getColumnsAsSeq(context, json))
Distinctness(getColumnsAsSeq(context, json).toSeq)

case "Entropy" =>
Entropy(json.get(COLUMN_FIELD).getAsString)

case "MutualInformation" =>
MutualInformation(getColumnsAsSeq(context, json))
MutualInformation(getColumnsAsSeq(context, json).toSeq)

case "UniqueValueRatio" =>
UniqueValueRatio(getColumnsAsSeq(context, json))
UniqueValueRatio(getColumnsAsSeq(context, json).toSeq)

case "Uniqueness" =>
Uniqueness(getColumnsAsSeq(context, json))
Uniqueness(getColumnsAsSeq(context, json).toSeq)

case "Histogram" =>
Histogram(
Expand Down Expand Up @@ -571,7 +568,7 @@ private[deequ] object MetricDeserializer extends JsonDeserializer[Metric[_]] {
val instance = jsonObject.get("instance").getAsString
if (jsonObject.has("value")) {
val entries = jsonObject.get("value").getAsJsonObject
val values = entries.entrySet().map { entry =>
val values = entries.entrySet().asScala.map { entry =>
entry.getKey -> entry.getValue.getAsDouble
}
.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ private[repository] object MetricsRepositoryMultipleResultsLoader {

def jsonUnion(jsonOne: String, jsonTwo: String): String = {

val objectOne: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonOne)
val objectTwo: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonTwo)
val objectOne: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonOne).toSeq
val objectTwo: Seq[Map[String, Any]] = SimpleResultSerde.deserialize(jsonTwo).toSeq

val columnsTotal = objectOne.headOption.getOrElse(Map.empty).keySet ++
objectTwo.headOption.getOrElse(Map.empty).keySet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ class FileSystemMetricsRepositoryMultipleResultsLoader(
.metricMap
.filterKeys(analyzer => forAnalyzers.isEmpty || forAnalyzers.get.contains(analyzer))

val requestedAnalyzerContext = AnalyzerContext(requestedMetrics)
val requestedAnalyzerContext = AnalyzerContext(requestedMetrics.toMap)

AnalysisResult(analysisResult.resultKey, requestedAnalyzerContext)
}
}.toSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.amazon.deequ.metrics.Metric
import com.amazon.deequ.repository._
import com.amazon.deequ.analyzers.runners.AnalyzerContext

import scala.collection.JavaConversions._
import scala.jdk.CollectionConverters._
import java.util.concurrent.ConcurrentHashMap

/** A simple Repository implementation backed by a concurrent hash map */
Expand Down Expand Up @@ -117,7 +117,7 @@ class LimitedInMemoryMetricsRepositoryMultipleResultsLoader(

/** Get the AnalysisResult */
def get(): Seq[AnalysisResult] = {
resultsRepository
resultsRepository.asScala
.filterKeys(key => after.isEmpty || after.get <= key.dataSetDate)
.filterKeys(key => before.isEmpty || key.dataSetDate <= before.get)
.filterKeys(key => tagValues.isEmpty || tagValues.get.toSet.subsetOf(key.tags.toSet))
Expand All @@ -129,7 +129,7 @@ class LimitedInMemoryMetricsRepositoryMultipleResultsLoader(
.metricMap
.filterKeys(analyzer => forAnalyzers.isEmpty || forAnalyzers.get.contains(analyzer))

AnalysisResult(analysisResult.resultKey, AnalyzerContext(requestedMetrics))
AnalysisResult(analysisResult.resultKey, AnalyzerContext(requestedMetrics.toMap))
}
.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ConstraintSuggestionRunner {
groupedSuggestionsWithColumnNames.map { case (_, suggestion) => suggestion } }

ConstraintSuggestionResult(columnProfiles.profiles, columnProfiles.numRecords,
columnsWithSuggestions, verificationResult)
columnsWithSuggestions.toMap, verificationResult)
}

private[this] def splitTrainTestSets(
Expand Down
7 changes: 7 additions & 0 deletions src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,13 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with
"Pattern compliance analyzer" should {
val someColumnName = "some"

"PatternMatch hashCode should equal for the same pattern" in {
val p1 = PatternMatch("col1", "[a-z]".r)
val p2 = PatternMatch("col1", "[a-z]".r)
p1.hashCode() should equal(p2.hashCode())
p1 should equal(p2)
}

"not match doubles in nullable column" in withSparkSession { sparkSession =>

val df = dataFrameWithColumn(someColumnName, DoubleType, sparkSession, Row(1.1),
Expand Down