Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue#62: Constraint suggestion extended example #394

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.examples

import com.amazon.deequ.examples.ExampleUtils.withSpark
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}

private[examples] object ConstraintSuggestionExtendedExample extends App {

withSpark { session =>
/**
* For this case:
* Generates 300 rows where at least 75% of the rows are going to be in IN_TRANSIT,
* and the 25% left are going to be random among IN_TRANSIT, DELAYED and UNKNOWN.
* At least 50% are going the be valuable 'true' and the other 50% random values
* among 'true', 'false' and 'null'. ~Adjust as you like
* Note:
* Send 0 to either statusDist or valuableDist to generate whole set randomly i.e.:
* DataProfile(300, 0, 0) so it would generate 300 rows with random statuses and
* random valuables.
*/
val dataProfile = DataProfile(300, 75, 50)
val generatedData = GenerateDataUtils.generateData(dataProfile)
val generatedDataRdd = session.sparkContext.parallelize(generatedData)
val rows = session.createDataFrame(generatedDataRdd)

// We ask deequ to compute constraint suggestions for us on the data
// It will profile the data and than apply a set of rules specified in addConstraintRules()
// to suggest constraints
val suggestionResult = ConstraintSuggestionRunner()
.onData(rows)
.addConstraintRules(Rules.DEFAULT)
.run()

// We can now investigate the constraints that deequ suggested. We get a textual description
// and the corresponding scala code for each suggested constraint
//
// Note that the constraint suggestion is based on heuristic rules and assumes that the data it
// is shown is 'static' and correct, which might often not be the case in the real world.
// Therefore the suggestions should always be manually reviewed before being applied in real
// deployments.
suggestionResult.constraintSuggestions.foreach { case (column, suggestions) =>
suggestions.foreach { suggestion =>
println(s"Constraint suggestion for '$column':\t${suggestion.description}\n" +
s"The corresponding scala code is ${suggestion.codeForConstraint}\n")
}
}

}

}
108 changes: 108 additions & 0 deletions src/main/scala/com/amazon/deequ/examples/GenerateDataUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

/**
* This class generates random data based on DataProfile properties:
* size: the number of rows to generate
* statusDist: Minimum percent of rows with status in 'IN_TRANSIT'
* valuableDist: Minimum percent of rows with valuable equal to 'true'
* Note. Think of statusDist as a volume button for IN_TRANSIT status
* and valuable in 'TRUE'.
*/
package com.amazon.deequ.examples

import scala.collection.mutable.ListBuffer
import scala.util.Random

case class DataProfile(size: Int, statusDist: Int, valuableDist: Int)

object GenerateDataUtils {

val STATUSES = Array("IN_TRANSIT", "DELAYED", "UNKNOWN")
val VALUABLES = Array("true", "false", null)
val ran = new Random()

var dataList = ListBuffer.empty[RawData]

/**
* Generates data according to DataProfile object
* DataProfile.size: Number of rows to generate
* DataProfile.statusDist: Minimum percent of rows with status equal to 'IN_TRANSIT'
* DataProfile.valuableDist: Minimum percent of rows with valuable equal to 'true'
* @param dataProfile
* @return
*/
def generateData(dataProfile: DataProfile): Seq[RawData] = {
if (dataProfile != null &&
dataProfile.size > 0 &&
dataProfile.statusDist <= 100 &&
dataProfile.valuableDist <= 100) {
for (i <- 1 to dataProfile.size)
yield addData(
RawData(
"thing" + i,
generateTotalNumber(50), // Generates values between 0(inclusive) & 50(exclusive)
generateStatus(dataProfile), // Generates status according to DataProfile.statusDist
generateValuable(dataProfile) // Generates valuable according to DataProfile.valuableDist
)
)
} else {
println("Data not generated, review DataProfile values.")
}
//showData(dataList)
dataList.toSeq
}

// Generates totalNumber from zero inclusive to maxValue exclusive.
def generateTotalNumber(maxValue: Int) : String = {
ran.nextInt(maxValue).toFloat.toString
}

// Generates status according to statusDist, so that, statusDist will
// determine the minimum percentage of rows with status in 'IN_TRANSIT'.
def generateStatus(dataProfile: DataProfile): String = {
val inTransitMax: Float = dataProfile.statusDist.toFloat / 100F
val currentDataTot: Float = dataList.size.toFloat / dataProfile.size.toFloat
if (currentDataTot <= inTransitMax) {
STATUSES(0) // IN_TRANSIT
} else {
STATUSES(ran.nextInt(STATUSES.length)) // IN_TRANSIT, DELAYED or UNKNOWN
}
}

// Generates the status according to valuableDist, so that, valuableDist
// will determine the minimum percentage of rows with valuable in 'true'.
def generateValuable(dataProfile: DataProfile): String = {
val valuableMax: Float = dataProfile.valuableDist.toFloat / 100F
val currentDataTot: Float = dataList.size.toFloat / dataProfile.size.toFloat
if (currentDataTot <= valuableMax) {
VALUABLES(0) // true
} else {
VALUABLES(ran.nextInt(VALUABLES.length)) // true, false or null
}
}

def addData(rawData: RawData): Unit =
dataList += rawData

def showData(dataList: ListBuffer[RawData]): Unit = {
dataList.foreach(rawData => {
println(rawData)
})
}

}