Skip to content

Commit

Permalink
ISP Lookup (#1117)
Browse files Browse the repository at this point in the history
* Add ISP lookup step to decoder

* Add tests for ISP lookup

* Add ASN to ISP lookup

* Update tests with ISP lookup

* Use BeamFileInputStream for reading ISP database

* Remove ASN from ISP lookup

* Remove println from test

* fix GeoIsp counter

* Fix GeoIspLookup tests

* Fix import ordering
  • Loading branch information
scholtzan authored May 21, 2020
1 parent ad1184a commit de9626f
Show file tree
Hide file tree
Showing 12 changed files with 12,895 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.mozilla.telemetry.decoder.DecryptPioneerPayloads;
import com.mozilla.telemetry.decoder.Deduplicate;
import com.mozilla.telemetry.decoder.GeoCityLookup;
import com.mozilla.telemetry.decoder.GeoIspLookup;
import com.mozilla.telemetry.decoder.ParsePayload;
import com.mozilla.telemetry.decoder.ParseProxy;
import com.mozilla.telemetry.decoder.ParseUri;
Expand Down Expand Up @@ -58,10 +59,11 @@ public static PipelineResult run(DecoderOptions.Parsed options) {
Optional.of(pipeline) //
.map(p -> p //
.apply(options.getInputType().read(options)) //
// We apply ParseProxy and GeoCityLookup first so that IP address is already removed
// before any message gets routed to error output; see
// We apply ParseProxy and GeoCityLookup and GeoIspLookup first so that IP
// address is already removed before any message gets routed to error output; see
// https://github.com/mozilla/gcp-ingestion/issues/1096
.apply(ParseProxy.of()) //
.apply(GeoIspLookup.of(options.getGeoIspDatabase())) //
.apply(GeoCityLookup.of(options.getGeoCityDatabase(), options.getGeoCityFilter())) //
.apply("ParseUri", ParseUri.of()).failuresTo(failureCollections) //
.apply(DecompressPayload.enabled(options.getDecompressInputPayloads())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public interface DecoderOptions extends SinkOptions, PipelineOptions {

void setGeoCityFilter(ValueProvider<String> value);

@Description("Path (local or gs://) to GeoIP2-ISP.mmdb")
@Validation.Required
ValueProvider<String> getGeoIspDatabase();

void setGeoIspDatabase(ValueProvider<String> value);

@Description("URI of a redis server that will be used for deduplication; leave null to disable")
ValueProvider<String> getRedisUri();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package com.mozilla.telemetry.decoder;

import com.google.common.annotations.VisibleForTesting;
import com.maxmind.db.CHMCache;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.IspResponse;
import com.mozilla.telemetry.ingestion.core.Constant.Attribute;
import com.mozilla.telemetry.transforms.PubsubConstraints;
import com.mozilla.telemetry.util.BeamFileInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;

public class GeoIspLookup
extends PTransform<PCollection<PubsubMessage>, PCollection<PubsubMessage>> {

public static GeoIspLookup of(ValueProvider<String> ispDatabase) {
return new GeoIspLookup(ispDatabase);
}

/////////

private static transient DatabaseReader singletonIspReader;

private final ValueProvider<String> geoIspDatabase;

private GeoIspLookup(ValueProvider<String> ispDatabase) {
this.geoIspDatabase = ispDatabase;
}

@VisibleForTesting
static synchronized void clearSingletonsForTests() {
singletonIspReader = null;
}

private static synchronized DatabaseReader getOrCreateSingletonIspReader(
ValueProvider<String> ispDatabase) throws IOException {
if (singletonIspReader == null) {
File mmdb;

try {
InputStream inputStream = BeamFileInputStream.open(ispDatabase.get());
Path mmdbPath = Paths.get(System.getProperty("java.io.tmpdir"), "GeoIspLookup.mmdb");
Files.copy(inputStream, mmdbPath, StandardCopyOption.REPLACE_EXISTING);
mmdb = mmdbPath.toFile();
} catch (IOException e) {
throw new IOException("Exception thrown while fetching configured geoIspDatabase", e);
}
singletonIspReader = new DatabaseReader.Builder(mmdb).withCache(new CHMCache()).build();
}
return singletonIspReader;
}

@VisibleForTesting
public class Fn extends SimpleFunction<PubsubMessage, PubsubMessage> {

private transient DatabaseReader ispReader;

private final Counter foundIp = Metrics.counter(GeoIspLookup.Fn.class, "found_ip");
private final Counter countIspAlreadyApplied = Metrics.counter(Fn.class, "isp_already_applied");
private final Counter foundIsp = Metrics.counter(Fn.class, "found_isp");
private final Counter countIpForwarded = Metrics.counter(GeoIspLookup.Fn.class,
"ip_from_x_forwarded_for");
private final Counter countIpRemoteAddr = Metrics.counter(GeoIspLookup.Fn.class,
"ip_from_remote_addr");

@Override
public PubsubMessage apply(PubsubMessage message) {
message = PubsubConstraints.ensureNonNull(message);

try {
if (ispReader == null) {
loadResourcesOnFirstMessage();
}

if (message.getAttributeMap().containsKey(Attribute.ISP_NAME)) {
// Return early since ISP lookup has already been performed.
countIspAlreadyApplied.inc();
return message;
}

// copy attributes
Map<String, String> attributes = new HashMap<String, String>(message.getAttributeMap());

// Determine client ip
String ip;
String xff = attributes.get(Attribute.X_FORWARDED_FOR);

if (xff != null) {
// Google's load balancer will append the immediate sending client IP and a global
// forwarding rule IP to any existing content in X-Forwarded-For as documented in:
// https://cloud.google.com/load-balancing/docs/https/#components
//
// In practice, many of the "first" addresses are bogus or internal,
// so we target the immediate sending client IP.
String[] ips = xff.split("\\s*,\\s*");
ip = ips[Math.max(ips.length - 2, 0)];
countIpForwarded.inc();
} else {
ip = attributes.getOrDefault(Attribute.REMOTE_ADDR, "");
countIpRemoteAddr.inc();
}

try {
attributes.put(Attribute.ISP_DB_VERSION, DateTimeFormatter.ISO_INSTANT
.format(Instant.ofEpochMilli(ispReader.getMetadata().getBuildDate().getTime())));

// Throws UnknownHostException
InetAddress ipAddress = InetAddress.getByName(ip);
foundIp.inc();

IspResponse response = ispReader.isp(ipAddress);
foundIsp.inc();

attributes.put(Attribute.ISP_NAME, response.getIsp());
attributes.put(Attribute.ISP_ORGANIZATION, response.getOrganization());
} catch (UnknownHostException | GeoIp2Exception ignore) {
// ignore these exceptions
}

// remove null attributes because the coder can't handle them
attributes.values().removeIf(Objects::isNull);

return new PubsubMessage(message.getPayload(), attributes);
} catch (IOException e) {
// Re-throw unchecked, so that the pipeline will fail at run time if it occurs
throw new UncheckedIOException(e);
}
}

private void loadResourcesOnFirstMessage() throws IOException {
if (geoIspDatabase == null || !geoIspDatabase.isAccessible()) {
throw new IllegalArgumentException("--geoIspDatabase must be defined for IspLookup");
}

ispReader = getOrCreateSingletonIspReader(geoIspDatabase);
}
}

private final GeoIspLookup.Fn fn = new Fn();

@Override
public PCollection<PubsubMessage> expand(PCollection<PubsubMessage> input) {
return input.apply(MapElements.via(fn));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void testBasicErrorOutput() throws Exception {
"--outputFileFormat=json", "--outputType=file", "--output=" + output,
"--errorOutputType=file", "--errorOutput=" + errorOutput,
"--geoCityDatabase=src/test/resources/cityDB/GeoIP2-City-Test.mmdb",
"--geoIspDatabase=src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb",
"--schemasLocation=schemas.tar.gz", "--errorOutputFileCompression=UNCOMPRESSED" });

List<String> errorOutputLines = Lines.files(errorOutput + "*.ndjson");
Expand All @@ -61,9 +62,11 @@ public void testMixedErrorCases() throws Exception {
"--errorOutputType=file", "--errorOutput=" + errorOutput, "--includeStackTrace=false",
"--outputFileCompression=UNCOMPRESSED", "--errorOutputFileCompression=UNCOMPRESSED",
"--geoCityDatabase=src/test/resources/cityDB/GeoIP2-City-Test.mmdb",
"--geoIspDatabase=src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb",
"--schemasLocation=schemas.tar.gz", "--redisUri=" + redis.uri });

List<String> outputLines = Lines.files(output + "*.ndjson");

List<String> expectedOutputLines = Lines.files(resourceDir + "/output.ndjson");
assertThat("Main output differed from expectation", outputLines,
matchesInAnyOrder(expectedOutputLines));
Expand All @@ -87,6 +90,7 @@ public void testGzippedPayload() throws Exception {
"--outputFileCompression=UNCOMPRESSED", "--errorOutputType=file",
"--errorOutput=" + errorOutput, "--includeStackTrace=false",
"--geoCityDatabase=src/test/resources/cityDB/GeoIP2-City-Test.mmdb",
"--geoIspDatabase=src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb",
"--schemasLocation=schemas.tar.gz", "--redisUri=" + redis.uri });

List<String> outputLines = Lines.files(output + "*.ndjson");
Expand Down Expand Up @@ -187,6 +191,7 @@ public void testIdempotence() throws Exception {
"--includeStackTrace=false", "--outputFileCompression=UNCOMPRESSED",
"--errorOutputFileCompression=UNCOMPRESSED",
"--geoCityDatabase=src/test/resources/cityDB/GeoIP2-City-Test.mmdb",
"--geoIspDatabase=src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb",
"--schemasLocation=schemas.tar.gz", "--redisUri=" + redis.uri });

Decoder.main(new String[] { "--inputFileFormat=json", "--inputType=file", //
Expand All @@ -197,6 +202,7 @@ public void testIdempotence() throws Exception {
"--includeStackTrace=false", "--outputFileCompression=UNCOMPRESSED",
"--errorOutputFileCompression=UNCOMPRESSED",
"--geoCityDatabase=src/test/resources/cityDB/GeoIP2-City-Test.mmdb",
"--geoIspDatabase=src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb",
"--schemasLocation=schemas.tar.gz", "--redisUri=" + redis.uri });

List<String> outputLines = Lines.files(output + "*.ndjson");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.mozilla.telemetry.decoder;

import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import com.google.common.collect.Lists;
import com.mozilla.telemetry.options.InputFileFormat;
import com.mozilla.telemetry.options.OutputFileFormat;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class GeoIspLookupTest {

private static final String MMDB = "src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb";

@Rule
public final transient TestPipeline pipeline = TestPipeline.create();

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void testOutput() {
// Some of the IPs below are chosen specifically because they are contained in the test city
// database; see the json source for the test db in:
// https://github.com/maxmind/MaxMind-DB/blob/664aeeb08bb50f53a1fdceac763c37f6465e44a4/source-data/GeoIP2-City-Test.json
final List<String> input = Arrays.asList(
"{\"attributeMap\":{\"host\":\"test\"},\"payload\":\"dGVzdA==\"}", //
"{\"attributeMap\":{\"remote_addr\":\"24.38.243.141\"},\"payload\":\"\"}", //
"{\"attributeMap\":" //
+ "{\"remote_addr\":\"10.0.0.2\"" //
+ ",\"x_forwarded_for\":\"192.168.1.2, 23.32.32.1, 23.32.32.11\"" //
+ "},\"payload\":\"\"}");

final List<String> expected = Arrays.asList(//
"{\"attributeMap\":" //
+ "{\"host\":\"test\"" //
+ ",\"isp_db_version\":\"2018-01-15T22:27:16Z\"" //
+ "},\"payload\":\"dGVzdA==\"}", //
"{\"attributeMap\":" //
+ "{\"isp_name\":\"Akamai Technologies\"" //
+ ",\"remote_addr\":\"10.0.0.2\"" //
+ ",\"isp_db_version\":\"2018-01-15T22:27:16Z\"" //
+ ",\"isp_organization\":\"Akamai Technologies\"" //
+ ",\"x_forwarded_for\":\"192.168.1.2, 23.32.32.1, 23.32.32.11\"}" //
+ ",\"payload\":\"\"}", //
"{\"attributeMap\":" //
+ "{\"remote_addr\":\"24.38.243.141\"" //
+ ",\"isp_db_version\":\"2018-01-15T22:27:16Z\"" //
+ ",\"isp_organization\":\"LAWN MULLEN & GOOD INTERNATIONAL\"}" + ",\"payload\":\"\"}");

final PCollection<String> output = pipeline //
.apply(Create.of(input)) //
.apply(InputFileFormat.json.decode()) //
.apply(GeoIspLookup.of(pipeline.newProvider(MMDB))).apply(OutputFileFormat.json.encode());

PAssert.that(output).containsInAnyOrder(expected);

GeoIspLookup.clearSingletonsForTests();
final PipelineResult result = pipeline.run();

final List<MetricResult<Long>> counters = Lists.newArrayList(result.metrics()
.queryMetrics(MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(GeoIspLookup.Fn.class)).build())
.getCounters());

assertEquals(4, counters.size());
counters.forEach(counter -> assertThat(counter.getCommitted(), greaterThan(0L)));
}

@Test
public void testThrowsOnMissingIspDatabase() {
thrown.expectCause(IsInstanceOf.instanceOf(UncheckedIOException.class));

final List<String> input = Collections
.singletonList("{\"attributeMap\":{\"host\":\"test\"},\"payload\":\"dGVzdA==\"}");

pipeline //
.apply(Create.of(input)) //
.apply(InputFileFormat.json.decode()) //
.apply(GeoIspLookup.of(pipeline.newProvider("missing-file.mmdb")));

GeoIspLookup.clearSingletonsForTests();
pipeline.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void testDecoderWithRemoteInputAndOutput() throws Exception {
"--outputFileCompression=UNCOMPRESSED", "--errorOutputFileCompression=UNCOMPRESSED",
"--errorOutputType=file", "--errorOutput=" + errorOutput, "--includeStackTrace=false",
"--geoCityDatabase=src/test/resources/cityDB/GeoIP2-City-Test.mmdb",
"--geoIspDatabase=src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb",
"--schemasLocation=schemas.tar.gz", "--redisUri=" + redis.uri });

tempFolder.newFolder("out");
Expand All @@ -99,7 +100,7 @@ public void testDecoderWithRemoteInputAndOutput() throws Exception {
}

@Test
public void testCompileDataflowTemplate() throws Exception {
public void testCompileDataflowTemplate() {
String gcsPath = "gs://" + bucket;

Decoder.main(new String[] { "--runner=Dataflow", "--project=" + projectId,
Expand All @@ -108,8 +109,8 @@ public void testCompileDataflowTemplate() throws Exception {
"--inputType=file", "--outputFileFormat=json", "--outputType=file",
"--errorOutputType=file",
"--geoCityDatabase=src/test/resources/cityDB/GeoIP2-City-Test.mmdb",
"--geoIspDatabase=src/test/resources/ispDB/GeoIP2-ISP-Test.mmdb",
"--schemasLocation=schemas.tar.gz" });

}

private void uploadInputFile(String localPath) throws IOException {
Expand All @@ -119,7 +120,7 @@ private void uploadInputFile(String localPath) throws IOException {
storage.create(BlobInfo.newBuilder(BlobId.of(bucket, remotePath)).build(), content);
}

private void downloadOutputFiles(String prefix) throws IOException {
private void downloadOutputFiles(String prefix) {
Page<Blob> blobs = storage.list(bucket, BlobListOption.prefix(prefix));
for (Blob blob : blobs.iterateAll()) {
blob.downloadTo(Paths.get(tempFolder.getRoot().getPath(), blob.getName()));
Expand Down
Loading

0 comments on commit de9626f

Please sign in to comment.