diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/store/JdbcDataStore.java b/hydra-main/src/main/java/com/addthis/hydra/job/store/JdbcDataStore.java index 69e9265a9..129a29295 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/store/JdbcDataStore.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/store/JdbcDataStore.java @@ -187,8 +187,8 @@ public Map get(String[] paths) { } String command = sb.append(")").toString(); Map rv = new HashMap<>(); - try (Connection conn = cpds.getConnection()) { - PreparedStatement preparedStatement = conn.prepareStatement(command); + try (Connection conn = cpds.getConnection(); + PreparedStatement preparedStatement = conn.prepareStatement(command)) { // The first condition is that the child value is blankChildValue preparedStatement.setString(1, blankChildValue); int j = 2; diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/store/MysqlDataStore.java b/hydra-main/src/main/java/com/addthis/hydra/job/store/MysqlDataStore.java index e75c27509..d643c980a 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/store/MysqlDataStore.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/store/MysqlDataStore.java @@ -53,11 +53,12 @@ public MysqlDataStore(String jdbcUrl, String dbName, String tableName, Propertie @Override protected void runSetupDatabaseCommand(final String dbName, final String jdbcUrl, final Properties properties) throws SQLException { - try (Connection connection = DriverManager.getConnection(jdbcUrl, properties)) { + String dbSetupCommand = String.format("CREATE DATABASE IF NOT EXISTS %s", dbName); + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + PreparedStatement preparedStatement = connection.prepareStatement(dbSetupCommand)) { // Create a connection that excludes the database from the jdbc url. // This is necessary to create the database in the event that it does not exist. - String dbSetupCommand = String.format("CREATE DATABASE IF NOT EXISTS %s", dbName); - connection.prepareStatement(dbSetupCommand).execute(); + preparedStatement.execute(); } } @@ -66,15 +67,16 @@ protected void runSetupDatabaseCommand(final String dbName, final String jdbcUrl */ @Override protected void runSetupTableCommand() throws SQLException { - try (Connection connection = cpds.getConnection()) { - String tableSetupCommand = String.format("CREATE TABLE IF NOT EXISTS %s ( " - + "%s INT NOT NULL AUTO_INCREMENT, " + // Auto-incrementing int id - "%s VARCHAR(%d) NOT NULL, %s MEDIUMBLOB, %s VARCHAR(%d), " + // VARCHAR path, BLOB value, VARCHAR child - "PRIMARY KEY (%s), UNIQUE KEY (%s,%s)) " + // Use id as primary key, enforce unique (path, child) combo - "ENGINE=%s", // Use specified table type (MyISAM works best in practice) - tableName, getIdKey(), getPathKey(), getMaxPathLength(), getValueKey(), getChildKey(), - getMaxPathLength(), getIdKey(), getPathKey(), getChildKey(), tableType); - connection.prepareStatement(tableSetupCommand).execute(); + String tableSetupCommand = String.format("CREATE TABLE IF NOT EXISTS %s ( " + + "%s INT NOT NULL AUTO_INCREMENT, " + // Auto-incrementing int id + "%s VARCHAR(%d) NOT NULL, %s MEDIUMBLOB, %s VARCHAR(%d), " + // VARCHAR path, BLOB value, VARCHAR child + "PRIMARY KEY (%s), UNIQUE KEY (%s,%s)) " + // Use id as primary key, enforce unique (path, child) combo + "ENGINE=%s", // Use specified table type (MyISAM works best in practice) + tableName, getIdKey(), getPathKey(), getMaxPathLength(), getValueKey(), getChildKey(), + getMaxPathLength(), getIdKey(), getPathKey(), getChildKey(), tableType); + try (Connection connection = cpds.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(tableSetupCommand)) { + preparedStatement.execute(); } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/store/PostgresDataStore.java b/hydra-main/src/main/java/com/addthis/hydra/job/store/PostgresDataStore.java index 1024935dd..383f2f2b2 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/store/PostgresDataStore.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/store/PostgresDataStore.java @@ -43,11 +43,12 @@ public PostgresDataStore(String jdbcUrl, String dbName, String tableName, Proper @Override protected void runSetupDatabaseCommand(String dbName, String jdbcUrl, Properties properties) throws SQLException { - try (Connection connection = DriverManager.getConnection(jdbcUrl, properties)) { + String dbSetupCommand = String.format("CREATE DATABASE %s", dbName); + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + PreparedStatement preparedStatement = connection.prepareStatement(dbSetupCommand)) { // Create a connection that excludes the database from the jdbc url. // This is necessary to create the database in the event that it does not exist. - String dbSetupCommand = String.format("CREATE DATABASE %s", dbName); - connection.prepareStatement(dbSetupCommand).execute(); + preparedStatement.execute(); } catch (final SQLException se) { //the database may already exists if (se.getMessage().endsWith("already exists")) { @@ -60,34 +61,37 @@ protected void runSetupDatabaseCommand(String dbName, String jdbcUrl, Properties @Override protected void runSetupTableCommand() throws SQLException { - try (final Connection connection = cpds.getConnection()) { - final String tableSetupCommand = new StringBuffer("CREATE TABLE IF NOT EXISTS ").append(tableName).append(" ( ") - .append(getIdKey()).append(" SERIAL PRIMARY KEY, ") // Auto-incrementing int id - .append(getPathKey()).append(" VARCHAR(").append(getMaxPathLength()).append(") NOT NULL, ") // VARCHAR path - .append(getValueKey()).append(" VARCHAR, ")//VARCHAR value - .append(getChildKey()).append(" VARCHAR(").append(getMaxPathLength()).append("), ") // VARCHAR child - .append("CONSTRAINT parent_child UNIQUE (").append(getPathKey()).append(", ").append(getChildKey()).append(")) ") // enforce unique (path, child) combo - .toString(); - connection.prepareStatement(tableSetupCommand).execute(); - final String replaceFunctionSetupCommand = String.format( - "CREATE OR REPLACE FUNCTION replace_entry(%s VARCHAR, %s VARCHAR, %s VARCHAR) RETURNS void AS $$\n" - + "BEGIN\n" - + " IF EXISTS( SELECT * FROM %s WHERE %s = %s ) THEN\n" - + " UPDATE %s\n" - + " SET %s = %s, %s = %s WHERE %s = %s;\n" - + " ELSE\n" - + " INSERT INTO %s(%s, %s, %s) VALUES( %s, %s, %s );\n" - + " END IF;\n" - + "\n" - + " RETURN;\n" - + "END;\n" - + "$$ LANGUAGE plpgsql;", - getPathKey() + "Var", getValueKey() + "Var", getChildKey() + "Var", - tableName, getPathKey(), getPathKey() + "Var", - tableName, - getValueKey(), getValueKey() + "Var", getChildKey(), getChildKey() + "Var" ,getPathKey(), getPathKey() + "Var", - tableName, getPathKey(), getValueKey(), getChildKey(), getPathKey() + "Var", getValueKey() + "Var", getChildKey() + "Var"); - connection.prepareStatement(replaceFunctionSetupCommand).execute(); + final String tableSetupCommand = new StringBuffer("CREATE TABLE IF NOT EXISTS ").append(tableName).append(" ( ") + .append(getIdKey()).append(" SERIAL PRIMARY KEY, ") // Auto-incrementing int id + .append(getPathKey()).append(" VARCHAR(").append(getMaxPathLength()).append(") NOT NULL, ") // VARCHAR path + .append(getValueKey()).append(" VARCHAR, ")//VARCHAR value + .append(getChildKey()).append(" VARCHAR(").append(getMaxPathLength()).append("), ") // VARCHAR child + .append("CONSTRAINT parent_child UNIQUE (").append(getPathKey()).append(", ").append(getChildKey()).append(")) ") // enforce unique (path, child) combo + .toString(); + final String replaceFunctionSetupCommand = String.format( + "CREATE OR REPLACE FUNCTION replace_entry(%s VARCHAR, %s VARCHAR, %s VARCHAR) RETURNS void AS $$\n" + + "BEGIN\n" + + " IF EXISTS( SELECT * FROM %s WHERE %s = %s ) THEN\n" + + " UPDATE %s\n" + + " SET %s = %s, %s = %s WHERE %s = %s;\n" + + " ELSE\n" + + " INSERT INTO %s(%s, %s, %s) VALUES( %s, %s, %s );\n" + + " END IF;\n" + + "\n" + + " RETURN;\n" + + "END;\n" + + "$$ LANGUAGE plpgsql;", + getPathKey() + "Var", getValueKey() + "Var", getChildKey() + "Var", + tableName, getPathKey(), getPathKey() + "Var", + tableName, + getValueKey(), getValueKey() + "Var", getChildKey(), getChildKey() + "Var" ,getPathKey(), getPathKey() + "Var", + tableName, getPathKey(), getValueKey(), getChildKey(), getPathKey() + "Var", getValueKey() + "Var", getChildKey() + "Var"); + try (final Connection connection = cpds.getConnection(); + PreparedStatement preparedStatement1 = connection.prepareStatement(tableSetupCommand); + PreparedStatement preparedStatement2 = connection.prepareStatement(replaceFunctionSetupCommand)) { + preparedStatement1.execute(); + preparedStatement2.execute(); + try { final String createIndexCommand = String.format("CREATE INDEX ON %s (%s)", tableName, getPathKey()); connection.prepareStatement(createIndexCommand).execute(); diff --git a/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/AssetsResource.java b/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/AssetsResource.java index 7914c7a05..b2f36ce98 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/AssetsResource.java +++ b/hydra-main/src/main/java/com/addthis/hydra/job/web/resources/AssetsResource.java @@ -26,9 +26,9 @@ import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; - import java.net.URI; import org.slf4j.Logger; @@ -71,9 +71,10 @@ public Response getIndex() { public Response getAsset(@PathParam("target") String target) { File file = new File(webDir, uriInfo.getPath()); if (file.exists() && file.isFile()) { + InputStream in = null; try { OutputStream out = response.getOutputStream(); - InputStream in = new FileInputStream(file); + in = new FileInputStream(file); byte[] buf = new byte[1024]; int read = 0; while ((read = in.read(buf)) >= 0) { @@ -86,6 +87,12 @@ public Response getAsset(@PathParam("target") String target) { return Response.ok().build(); } catch (Exception ex) { return Response.serverError().build(); + } finally { + try { + in.close(); + } catch (IOException e) { + log.error("Exception while closing resource", e); + } } } else { if (log.isDebugEnabled()) log.debug("[http.unhandled] " + uriInfo.getPath()); diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/LogUtils.java b/hydra-main/src/main/java/com/addthis/hydra/minion/LogUtils.java index aabf5746c..4dcc65811 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/LogUtils.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/LogUtils.java @@ -38,9 +38,10 @@ public final class LogUtils { /** Streams task log files from newest to oldest. The returned Stream should be closed. */ public static Stream streamTaskLogsByName(JobTask task) throws IOException { Path logDir = task.logDir.toPath(); - return Files.list(logDir) - .filter(path -> Files.isRegularFile(path, LinkOption.NOFOLLOW_LINKS)) - .sorted(Collections.reverseOrder()); + try(Stream stream = Files.list(logDir)) { + return stream.filter(path -> Files.isRegularFile(path, LinkOption.NOFOLLOW_LINKS)) + .sorted(Collections.reverseOrder()); + } } public static Optional getNthNewestLog(JobTask task, int runsAgo, String suffix) throws IOException { diff --git a/hydra-main/src/main/java/com/addthis/hydra/minion/ProcessUtils.java b/hydra-main/src/main/java/com/addthis/hydra/minion/ProcessUtils.java index ae2b74b96..8375c1db8 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/minion/ProcessUtils.java +++ b/hydra-main/src/main/java/com/addthis/hydra/minion/ProcessUtils.java @@ -128,16 +128,22 @@ private static Integer findActiveProcessWithTokens(String[] requireTokens, Strin command.append("| grep -v " + excludeToken); } command.append("| cut -c -5"); + Scanner scanner = null; try { SimpleExec exec = new SimpleExec(new String[]{"/bin/sh", "-c", command.toString()}).join(); if (exec.exitCode() == 0) { - return new Scanner(exec.stdoutString()).nextInt(); + scanner = new Scanner(exec.stdoutString());; + return scanner.nextInt(); } else { return null; } } catch (Exception e) { // No PID found return null; + } finally { + if(scanner != null) { + scanner.close(); + } } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java b/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java index b6af53eb9..74641f8e6 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/MeshQueryMaster.java @@ -18,9 +18,7 @@ import java.io.File; import java.io.IOException; - import java.net.InetSocketAddress; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -31,11 +29,11 @@ import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.addthis.basis.util.LessStreams; import com.addthis.basis.util.LessFiles; import com.addthis.basis.util.Parameter; - import com.addthis.hydra.data.query.Query; import com.addthis.hydra.data.query.QueryException; import com.addthis.hydra.query.aggregate.BalancedAllocator; @@ -50,7 +48,6 @@ import com.addthis.hydra.query.tracker.TrackerHandler; import com.addthis.meshy.MeshyServer; import com.addthis.meshy.service.file.FileReference; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; import com.google.common.base.Strings; @@ -246,10 +243,11 @@ protected void writeQuery(ChannelHandlerContext ctx, Query query, ChannelPromise if (Strings.isNullOrEmpty(tasks)) { return Collections.emptySet(); } else { - return LessStreams.stream(TASKS_SPLITTER.split(tasks)) - .map(Ints::tryParse) - .filter(i -> i != null) - .collect(Collectors.toSet()); + try(Stream stream = LessStreams.stream(TASKS_SPLITTER.split(tasks))) { + return stream.map(Ints::tryParse) + .filter(i -> i != null) + .collect(Collectors.toSet()); + } } } diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/web/GoogleDriveAuthentication.java b/hydra-main/src/main/java/com/addthis/hydra/query/web/GoogleDriveAuthentication.java index d28219a3e..e75e9470d 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/web/GoogleDriveAuthentication.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/web/GoogleDriveAuthentication.java @@ -100,21 +100,22 @@ private static void closeResource(@Nullable Closeable resource) { * Send an HTML formatted error message. */ private static void sendErrorMessage(ChannelHandlerContext ctx, String message) throws IOException { - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - response.headers().set(CONTENT_TYPE, "text/html; charset=utf-8"); - StringBuilderWriter writer = new StringBuilderWriter(50); - writer.append("Hydra Query Master"); - writer.append("

"); - writer.append(message); - writer.append("

"); - ByteBuf textResponse = ByteBufUtil.encodeString(ctx.alloc(), + try (StringBuilderWriter writer = new StringBuilderWriter(50)) { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(CONTENT_TYPE, "text/html; charset=utf-8"); + writer.append("Hydra Query Master"); + writer.append("

"); + writer.append(message); + writer.append("

"); + ByteBuf textResponse = ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(writer.getBuilder()), CharsetUtil.UTF_8); - HttpContent content = new DefaultHttpContent(textResponse); - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, textResponse.readableBytes()); - ctx.write(response); - ctx.write(content); - ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - lastContentFuture.addListener(ChannelFutureListener.CLOSE); + HttpContent content = new DefaultHttpContent(textResponse); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, textResponse.readableBytes()); + ctx.write(response); + ctx.write(content); + ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } } /** diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpQueryCallHandler.java b/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpQueryCallHandler.java index e31b21e32..144363eb2 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpQueryCallHandler.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpQueryCallHandler.java @@ -16,13 +16,12 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; - import java.util.Arrays; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.addthis.basis.kv.KVPairs; import com.addthis.basis.util.Parameter; - import com.addthis.hydra.data.query.Query; import com.addthis.hydra.data.query.source.ErrorHandlingQuerySource; import com.addthis.hydra.data.query.source.QuerySource; @@ -40,7 +39,6 @@ import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.EventExecutor; - import static com.addthis.hydra.query.web.HttpUtils.sendError; public final class HttpQueryCallHandler { @@ -65,8 +63,10 @@ public static ChannelFuture handleQuery(ChannelHandler queryToQueryResultsEncode if ((dir != null) && !job.endsWith(dir)) { String[] jobs = job.split(","); String[] dirs = dir.split(","); - job = Arrays.stream(jobs).flatMap(subJob -> Arrays.stream(dirs).map(subDir -> subJob + '/' + subDir)) + try (Stream stream = Arrays.stream(jobs)) { + stream.flatMap(subJob -> Arrays.stream(dirs).map(subDir -> subJob + '/' + subDir)) .collect(Collectors.joining(",")); + } } String path = kv.getValue("path", kv.getValue("q", "")); Query query = new Query(job, new String[]{path}, new String[]{kv.getValue("ops"), kv.getValue("rops")}); diff --git a/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpStaticFileHandler.java b/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpStaticFileHandler.java index c0dd2aad0..588572926 100644 --- a/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpStaticFileHandler.java +++ b/hydra-main/src/main/java/com/addthis/hydra/query/web/HttpStaticFileHandler.java @@ -179,35 +179,32 @@ public void channelRead0( log.trace("sending {}", file); - FileChannel fileChannel; + FileChannel fileChannel = null; try { fileChannel = FileChannel.open(file, StandardOpenOption.READ); - } catch (IOException fnfe) { - sendError(ctx, NOT_FOUND); - return; - } - long fileLength = fileChannel.size(); + long fileLength = fileChannel.size(); - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentLength(response, fileLength); - setContentTypeHeader(response, file); - try { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + setContentLength(response, fileLength); + setContentTypeHeader(response, file); setDateAndCacheHeaders(response, file); + if (isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + + // Write the initial line and the header. + ctx.write(response); + + // Write the content. + ctx.write(new DefaultFileRegion(fileChannel, 0, fileLength)); } catch (IOException ioex) { - fileChannel.close(); sendError(ctx, NOT_FOUND); return; + } finally { + if(fileChannel != null) { + fileChannel.close(); + } } - if (isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - - // Write the initial line and the header. - ctx.write(response); - - // Write the content. - ctx.write(new DefaultFileRegion(fileChannel, 0, fileLength)); - // Write the end marker ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);