From 68a7501cb786e9e7ea413042b204a8d919c195e8 Mon Sep 17 00:00:00 2001 From: Fernando Canchola Cruz Date: Mon, 21 Jul 2025 19:20:08 -0600 Subject: [PATCH 1/2] Refactor SQL module to use PreparedStatement (#1611) --- .../apache/stormcrawler/sql/IndexerBolt.java | 109 +++++------------- .../org/apache/stormcrawler/sql/SQLSpout.java | 91 +++++++-------- 2 files changed, 68 insertions(+), 132 deletions(-) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index bc3842239..c01717df6 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -1,19 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.stormcrawler.sql; import static org.apache.stormcrawler.Constants.StatusStreamName; @@ -21,6 +5,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Locale; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.storm.metric.api.MultiCountMetric; @@ -43,83 +28,60 @@ public class IndexerBolt extends AbstractIndexerBolt { public static final String SQL_INDEX_TABLE_PARAM_NAME = "sql.index.table"; private OutputCollector _collector; - private MultiCountMetric eventCounter; - private Connection connection; - private String tableName; - private Map conf; @Override - public void prepare( - Map conf, TopologyContext context, OutputCollector collector) { + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { super.prepare(conf, context, collector); _collector = collector; - this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10); - this.tableName = ConfUtils.getString(conf, SQL_INDEX_TABLE_PARAM_NAME); - this.conf = conf; } @Override public void execute(Tuple tuple) { String url = tuple.getStringByField("url"); - - // Distinguish the value used for indexing - // from the one used for the status String normalisedurl = valueForURL(tuple); - Metadata metadata = (Metadata) tuple.getValueByField("metadata"); String text = tuple.getStringByField("text"); boolean keep = filterDocument(metadata); if (!keep) { eventCounter.scope("Filtered").incrBy(1); - // treat it as successfully processed even if - // we do not index it _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); _collector.ack(tuple); return; } try { - - // which metadata to display? Map keyVals = filterMetadata(metadata); - - StringBuilder query = - new StringBuilder(" insert into ") - .append(tableName) - .append(" (") - .append(fieldNameForURL()); - Object[] keys = keyVals.keySet().toArray(); - for (int i = 0; i < keys.length; i++) { - query.append(", ").append((String) keys[i]); - } - - query.append(") values(?"); + StringBuilder fieldsBuilder = new StringBuilder(fieldNameForURL()); + StringBuilder placeholdersBuilder = new StringBuilder("?"); + StringBuilder updatesBuilder = new StringBuilder(); - for (int i = 0; i < keys.length; i++) { - query.append(", ?"); - } - - query.append(")"); - - query.append(" ON DUPLICATE KEY UPDATE "); for (int i = 0; i < keys.length; i++) { String key = (String) keys[i]; - if (i > 0) { - query.append(", "); - } - query.append(key).append("=VALUES(").append(key).append(")"); + fieldsBuilder.append(", ").append(key); + placeholdersBuilder.append(", ?"); + if (i > 0) updatesBuilder.append(", "); + updatesBuilder.append(key).append("=VALUES(").append(key).append(")"); } + String sql = String.format( + Locale.ROOT, + "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", + tableName, + fieldsBuilder, + placeholdersBuilder, + updatesBuilder + ); + if (connection == null) { try { connection = SQLUtil.getConnection(conf); @@ -129,62 +91,49 @@ public void execute(Tuple tuple) { } } - LOG.debug("PreparedStatement => {}", query); + LOG.debug("PreparedStatement => {}", sql); + PreparedStatement preparedStmt = connection.prepareStatement(sql); - // create the mysql insert preparedstatement - PreparedStatement preparedStmt = connection.prepareStatement(query.toString()); - - // TODO store the text of the document? - if (StringUtils.isNotBlank(fieldNameForText())) { - // builder.field(fieldNameForText(), trimText(text)); - } - - // send URL as field? - if (fieldNameForURL() != null) { - preparedStmt.setString(1, normalisedurl); - } + // Set URL as first parameter + preparedStmt.setString(1, normalisedurl); + // Set metadata values for (int i = 0; i < keys.length; i++) { insert(preparedStmt, i + 2, (String) keys[i], keyVals); } preparedStmt.executeUpdate(); + preparedStmt.close(); eventCounter.scope("Indexed").incrBy(1); - _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); _collector.ack(tuple); } catch (Exception e) { - // do not send to status stream so that it gets replayed LOG.error("Error inserting into SQL", e); _collector.fail(tuple); if (connection != null) { - // reset the connection try { connection.close(); } catch (SQLException e1) { + // ignore } connection = null; } } } - private void insert( - PreparedStatement preparedStmt, - int position, - String label, - Map keyVals) + private void insert(PreparedStatement preparedStmt, int position, String label, Map keyVals) throws SQLException { String[] values = keyVals.get(label); String value = ""; if (values == null || values.length == 0) { LOG.info("No values found for label {}", label); - } else if (values.length > 1) { - LOG.info("More than one value found for label {}", label); - value = values[0]; } else { value = values[0]; + if (values.length > 1) { + LOG.info("More than one value found for label {}", label); + } } preparedStmt.setString(position, value); } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index facb14094..6d937f892 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -19,9 +19,9 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.util.List; @@ -44,22 +44,11 @@ public class SQLSpout extends AbstractQueryingSpout { private static final Scheme SCHEME = new StringTabScheme(); private String tableName; - private Connection connection; - - /** - * if more than one instance of the spout exist, each one is in charge of a separate bucket - * value. This is used to ensure a good diversity of URLs. - */ private int bucketNum = -1; - - /** Used to distinguish between instances in the logs * */ protected String logIdprefix = ""; - private int maxDocsPerBucket; - private int maxNumResults; - private Instant lastNextFetchDate = null; @Override @@ -69,9 +58,7 @@ public void open( super.open(conf, context, collector); maxDocsPerBucket = ConfUtils.getInt(conf, Constants.SQL_MAX_DOCS_BUCKET_PARAM_NAME, 5); - tableName = ConfUtils.getString(conf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); - maxNumResults = ConfUtils.getInt(conf, Constants.SQL_MAXRESULTS_PARAM_NAME, 100); try { @@ -81,7 +68,6 @@ public void open( throw new RuntimeException(ex); } - // determine bucket this spout instance will be in charge of int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (totalTasks > 1) { logIdprefix = @@ -113,73 +99,74 @@ protected void populateBuffer() { } } - // select entries from mysql - // https://mariadb.com/kb/en/library/window-functions-overview/ - // http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/ - - String query = - "SELECT * from (select rank() over (partition by host order by nextfetchdate desc, url) as ranking, url, metadata, nextfetchdate from " - + tableName; + int alreadyprocessed = 0; + int numhits = 0; + long timeStartQuery = System.currentTimeMillis(); - query += - " WHERE nextfetchdate <= '" + new Timestamp(lastNextFetchDate.toEpochMilli()) + "'"; + PreparedStatement pstmt = null; + ResultSet rs = null; - // constraint on bucket num - if (bucketNum >= 0) { - query += " AND bucket = '" + bucketNum + "'"; - } + try { + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT * FROM ("); + queryBuilder.append("SELECT RANK() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) AS ranking, "); + queryBuilder.append("url, metadata, nextfetchdate FROM ").append(tableName); + queryBuilder.append(" WHERE nextfetchdate <= ?"); + if (bucketNum >= 0) { + queryBuilder.append(" AND bucket = ?"); + } + queryBuilder.append(") AS urls_ranks WHERE urls_ranks.ranking <= ? "); + if (maxNumResults != -1) { + queryBuilder.append("ORDER BY ranking LIMIT ?"); + } else { + queryBuilder.append("ORDER BY ranking"); + } - query += - ") as urls_ranks where (urls_ranks.ranking <= " - + maxDocsPerBucket - + ") order by ranking"; + String query = queryBuilder.toString(); + LOG.debug("{} SQL query: {}", logIdprefix, query); - if (maxNumResults != -1) { - query += " LIMIT " + this.maxNumResults; - } + pstmt = connection.prepareStatement(query); - int alreadyprocessed = 0; - int numhits = 0; + int paramIndex = 1; + pstmt.setTimestamp(paramIndex++, new Timestamp(lastNextFetchDate.toEpochMilli())); - long timeStartQuery = System.currentTimeMillis(); + if (bucketNum >= 0) { + pstmt.setInt(paramIndex++, bucketNum); + } - // create the java statement - Statement st = null; - ResultSet rs = null; - try { - st = this.connection.createStatement(); + pstmt.setInt(paramIndex++, maxDocsPerBucket); - // dump query to log - LOG.debug("{} SQL query {}", logIdprefix, query); + if (maxNumResults != -1) { + pstmt.setInt(paramIndex++, maxNumResults); + } - // execute the query, and get a java resultset - rs = st.executeQuery(query); + rs = pstmt.executeQuery(); long timeTaken = System.currentTimeMillis() - timeStartQuery; queryTimes.addMeasurement(timeTaken); - // iterate through the java resultset while (rs.next()) { String url = rs.getString("url"); numhits++; - // already processed? skip + if (beingProcessed.containsKey(url)) { alreadyprocessed++; continue; } + String metadata = rs.getString("metadata"); if (metadata == null) { metadata = ""; } else if (!metadata.startsWith("\t")) { metadata = "\t" + metadata; } + String URLMD = url + metadata; List v = SCHEME.deserialize(ByteBuffer.wrap(URLMD.getBytes(StandardCharsets.UTF_8))); buffer.add(url, (Metadata) v.get(1)); } - // no results? reset the date if (numhits == 0) { lastNextFetchDate = null; } @@ -204,9 +191,9 @@ protected void populateBuffer() { LOG.error("Exception closing resultset", e); } try { - if (st != null) st.close(); + if (pstmt != null) pstmt.close(); } catch (SQLException e) { - LOG.error("Exception closing statement", e); + LOG.error("Exception closing prepared statement", e); } } } From b2c90bb6972c7618781ccc7f5d7892c9f20248c8 Mon Sep 17 00:00:00 2001 From: Fernando Canchola Cruz Date: Tue, 22 Jul 2025 12:27:18 -0600 Subject: [PATCH 2/2] Restore license headers and comments to SQLSpout and IndexerBolt --- .../apache/stormcrawler/sql/IndexerBolt.java | 70 +++++++++++++++---- .../org/apache/stormcrawler/sql/SQLSpout.java | 20 +++++- 2 files changed, 74 insertions(+), 16 deletions(-) diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index c01717df6..3bd11752e 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.stormcrawler.sql; import static org.apache.stormcrawler.Constants.StatusStreamName; @@ -20,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Stores URL and selected metadata into a SQL table * */ +/** Stores URL and selected metadata into a SQL table */ public class IndexerBolt extends AbstractIndexerBolt { private static final Logger LOG = LoggerFactory.getLogger(IndexerBolt.class); @@ -31,10 +47,11 @@ public class IndexerBolt extends AbstractIndexerBolt { private MultiCountMetric eventCounter; private Connection connection; private String tableName; - private Map conf; + private Map conf; @Override - public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + public void prepare( + Map conf, TopologyContext context, OutputCollector collector) { super.prepare(conf, context, collector); _collector = collector; this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10); @@ -45,22 +62,30 @@ public void prepare(Map conf, TopologyContext context, OutputCol @Override public void execute(Tuple tuple) { String url = tuple.getStringByField("url"); + + // Distinguish the value used for indexing + // from the one used for the status String normalisedurl = valueForURL(tuple); + Metadata metadata = (Metadata) tuple.getValueByField("metadata"); String text = tuple.getStringByField("text"); boolean keep = filterDocument(metadata); if (!keep) { eventCounter.scope("Filtered").incrBy(1); + // treat it as successfully processed even if + // we do not index it _collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); _collector.ack(tuple); return; } try { + // which metadata to display? Map keyVals = filterMetadata(metadata); Object[] keys = keyVals.keySet().toArray(); + // Build SQL statement with prepared statement StringBuilder fieldsBuilder = new StringBuilder(fieldNameForURL()); StringBuilder placeholdersBuilder = new StringBuilder("?"); StringBuilder updatesBuilder = new StringBuilder(); @@ -73,14 +98,14 @@ public void execute(Tuple tuple) { updatesBuilder.append(key).append("=VALUES(").append(key).append(")"); } - String sql = String.format( - Locale.ROOT, - "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", - tableName, - fieldsBuilder, - placeholdersBuilder, - updatesBuilder - ); + String sql = + String.format( + Locale.ROOT, + "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", + tableName, + fieldsBuilder, + placeholdersBuilder, + updatesBuilder); if (connection == null) { try { @@ -92,12 +117,21 @@ public void execute(Tuple tuple) { } LOG.debug("PreparedStatement => {}", sql); + + // Create the MySQL insert PreparedStatement PreparedStatement preparedStmt = connection.prepareStatement(sql); - // Set URL as first parameter - preparedStmt.setString(1, normalisedurl); + // TODO store the text of the document? + if (StringUtils.isNotBlank(fieldNameForText())) { + // builder.field(fieldNameForText(), trimText(text)); + } + + // Send URL as first parameter + if (fieldNameForURL() != null) { + preparedStmt.setString(1, normalisedurl); + } - // Set metadata values + // Send metadata values for (int i = 0; i < keys.length; i++) { insert(preparedStmt, i + 2, (String) keys[i], keyVals); } @@ -110,9 +144,11 @@ public void execute(Tuple tuple) { _collector.ack(tuple); } catch (Exception e) { + // do not send to status stream so that it gets replayed LOG.error("Error inserting into SQL", e); _collector.fail(tuple); if (connection != null) { + // reset the connection try { connection.close(); } catch (SQLException e1) { @@ -123,7 +159,11 @@ public void execute(Tuple tuple) { } } - private void insert(PreparedStatement preparedStmt, int position, String label, Map keyVals) + private void insert( + PreparedStatement preparedStmt, + int position, + String label, + Map keyVals) throws SQLException { String[] values = keyVals.get(label); String value = ""; diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index 6d937f892..586d0b8f3 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -44,11 +44,22 @@ public class SQLSpout extends AbstractQueryingSpout { private static final Scheme SCHEME = new StringTabScheme(); private String tableName; + private Connection connection; + + /** + * If more than one instance of the spout exist, each one is in charge of a separate bucket + * value. This is used to ensure a good diversity of URLs. + */ private int bucketNum = -1; + + /** Used to distinguish between instances in the logs */ protected String logIdprefix = ""; + private int maxDocsPerBucket; + private int maxNumResults; + private Instant lastNextFetchDate = null; @Override @@ -68,6 +79,7 @@ public void open( throw new RuntimeException(ex); } + // Determine bucket this spout instance will be in charge of int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (totalTasks > 1) { logIdprefix = @@ -107,9 +119,13 @@ protected void populateBuffer() { ResultSet rs = null; try { + // Select entries from MySQL + // https://mariadb.com/kb/en/library/window-functions-overview/ + // http://www.mysqltutorial.org/mysql-window-functions/mysql-rank-function/ StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append("SELECT * FROM ("); - queryBuilder.append("SELECT RANK() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) AS ranking, "); + queryBuilder.append( + "SELECT RANK() OVER (PARTITION BY host ORDER BY nextfetchdate DESC, url) AS ranking, "); queryBuilder.append("url, metadata, nextfetchdate FROM ").append(tableName); queryBuilder.append(" WHERE nextfetchdate <= ?"); if (bucketNum >= 0) { @@ -149,6 +165,7 @@ protected void populateBuffer() { String url = rs.getString("url"); numhits++; + // Already processed? Skip if (beingProcessed.containsKey(url)) { alreadyprocessed++; continue; @@ -167,6 +184,7 @@ protected void populateBuffer() { buffer.add(url, (Metadata) v.get(1)); } + // No results? Reset the date if (numhits == 0) { lastNextFetchDate = null; }