diff --git a/src/main/java/com/sforce/async/BulkConnection.java b/src/main/java/com/sforce/async/BulkConnection.java index aeb307cb..f7c2d8b0 100644 --- a/src/main/java/com/sforce/async/BulkConnection.java +++ b/src/main/java/com/sforce/async/BulkConnection.java @@ -63,11 +63,12 @@ import com.sforce.ws.parser.XmlOutputStream; import com.sforce.ws.transport.Transport; import com.sforce.ws.util.FileUtil; +import com.sforce.ws.util.Verbose; /** * BulkConnection - * + * * @author mcheenath * @since 160 */ @@ -137,13 +138,13 @@ private JobInfo createOrUpdateJob(JobInfo job, String endpoint) throws AsyncApiE } private JobInfo createOrUpdateJob(JobInfo job, String endpoint, ContentType contentType) throws AsyncApiException { + OutputStream out = null; + InputStream in = null; try { Transport transport = config.createTransport(); - OutputStream out; if (contentType == ContentType.ZIP_JSON || contentType == ContentType.JSON) { out = transport.connect(endpoint, getHeaders(JSON_CONTENT_TYPE)); serializeToJson (out, job); - out.close(); } else { out = transport.connect(endpoint, getHeaders(XML_CONTENT_TYPE)); XmlOutputStream xout = new AsyncXmlOutputStream(out, true); @@ -151,7 +152,7 @@ private JobInfo createOrUpdateJob(JobInfo job, String endpoint, ContentType cont xout.close(); } - InputStream in = transport.getContent(); + in = transport.getContent(); if (transport.isSuccessful()) { if (contentType == ContentType.ZIP_JSON || contentType == ContentType.JSON) { @@ -166,12 +167,23 @@ private JobInfo createOrUpdateJob(JobInfo job, String endpoint, ContentType cont } else { parseAndThrowException(in, contentType); } - } catch (PullParserException e) { - throw new AsyncApiException("Failed to create job ", AsyncExceptionCode.ClientInputError, e); - } catch (IOException e) { - throw new AsyncApiException("Failed to create job ", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to create job ", AsyncExceptionCode.ClientInputError, e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + Verbose.log("Failed to close output stream"); + } + } + if (in != null) { + try { + in.close(); + } catch (IOException e) { + Verbose.log("Failed to close output stream"); + } + } } return null; } @@ -194,11 +206,7 @@ static void parseAndThrowException(InputStream in, ContentType type) throws Asyn } throw exception; - } catch (PullParserException e) { - throw new AsyncApiException("Failed to parse exception ", AsyncExceptionCode.ClientInputError, e); - } catch (IOException e) { - throw new AsyncApiException("Failed to parse exception", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to parse exception ", AsyncExceptionCode.ClientInputError, e); } } @@ -223,6 +231,8 @@ public BatchInfo createBatchFromZipStream(JobInfo jobInfo, InputStream zipInput) private BatchInfo createBatchFromStreamImpl(JobInfo jobInfo, InputStream input, boolean isZip) throws AsyncApiException { + InputStream result = null; + try { String endpoint = getRestEndpoint(); Transport transport = config.createTransport(); @@ -235,7 +245,7 @@ private BatchInfo createBatchFromStreamImpl(JobInfo jobInfo, InputStream input, FileUtil.copy(input, out); - InputStream result = transport.getContent(); + result = transport.getContent(); if (!transport.isSuccessful()) parseAndThrowException(result, jobInfo.getContentType()); //xml/json content type if (jobInfo.getContentType() == ContentType.JSON || jobInfo.getContentType() == ContentType.ZIP_JSON) @@ -243,12 +253,16 @@ private BatchInfo createBatchFromStreamImpl(JobInfo jobInfo, InputStream input, return BatchRequest.loadBatchInfo(result); - } catch (IOException e) { - throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); - } catch (PullParserException e) { - throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } finally { + if (result != null) { + try { + result.close(); + } catch (IOException e) { + Verbose.log("Failed to close result stream"); + } + } } } @@ -307,14 +321,16 @@ public BatchInfo createBatchWithInputStreamAttachments(JobInfo jobInfo, InputStr if (batchContent != null && attachments.get("request.txt") != null) throw new AsyncApiException("Request content cannot be included as both input stream and attachment", AsyncExceptionCode.ClientInputError); + + InputStream result = null; + try { String endpoint = getRestEndpoint(); endpoint = endpoint + "job/" + jobInfo.getId() + "/batch"; Transport transport = config.createTransport(); - ZipOutputStream zipOut = new ZipOutputStream(transport.connect(endpoint, getHeaders(getContentTypeString( - jobInfo.getContentType(), true)), false)); - try { + try (ZipOutputStream zipOut = new ZipOutputStream(transport.connect(endpoint, + getHeaders(getContentTypeString(jobInfo.getContentType(), true)), false))) { if (batchContent != null) { zipOut.putNextEntry(new ZipEntry("request.txt")); FileUtil.copy(batchContent, zipOut, false); @@ -323,28 +339,30 @@ public BatchInfo createBatchWithInputStreamAttachments(JobInfo jobInfo, InputStr zipOut.putNextEntry(new ZipEntry(entry.getKey())); FileUtil.copy(entry.getValue(), zipOut, false); } - } finally { - zipOut.close(); } - InputStream result = transport.getContent(); + result = transport.getContent(); if (jobInfo.getContentType() == ContentType.JSON || jobInfo.getContentType() == ContentType.ZIP_JSON) { return deserializeJsonToObject(result, BatchInfo.class); } return BatchRequest.loadBatchInfo(result); - } catch (IOException e) { - throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); - } catch (PullParserException e) { - throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); + } finally { + if (result != null) { + try { + result.close(); + } catch (IOException e) { + Verbose.log("Failed to close result stream"); + } + } } } /* - * Creates a compliant Async Api batch from a stream containing an arbitrary CSV source (eg. Outlook contacts). + * Creates a compliant Async Api batch from a stream containing an arbitrary CSV source (eg. Outlook contacts). * The stream does not have to be UTF-8, and it's contents are transformed into a compliant - * batch using the previously created transformation specification (a mapping of columns to fields). + * batch using the previously created transformation specification (a mapping of columns to fields). * The stream is still limited according to the same limit rules as apply to normal batches. */ public BatchInfo createBatchFromForeignCsvStream(JobInfo jobInfo, InputStream input, String charSet) throws AsyncApiException { @@ -363,11 +381,7 @@ public BatchInfo createBatchFromForeignCsvStream(JobInfo jobInfo, InputStream in InputStream result = transport.getContent(); if (!transport.isSuccessful()) parseAndThrowException(result, jobInfo.getContentType()); return BatchRequest.loadBatchInfo(result); - } catch (IOException e) { - throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); - } catch (PullParserException e) { - throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); } } @@ -380,9 +394,10 @@ public BatchInfo createBatchFromForeignCsvStream(JobInfo jobInfo, InputStream in * Salesforce Field,Csv Header,Value,Hint * LastName,Surname,#N/A, * Birthdate,Birthday,,MM-dd-YYYY - * + * */ public void createTransformationSpecFromStream(JobInfo jobInfo, InputStream input) throws AsyncApiException { + InputStream result = null; try { String endpoint = getRestEndpoint(); Transport transport = config.createTransport(); @@ -394,12 +409,18 @@ public void createTransformationSpecFromStream(JobInfo jobInfo, InputStream inpu FileUtil.copy(input, out); - InputStream result = transport.getContent(); + result = transport.getContent(); if (!transport.isSuccessful()) parseAndThrowException(result, jobInfo.getContentType()); - } catch (IOException e) { - throw new AsyncApiException("Failed to create transformation specification", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException e) { throw new AsyncApiException("Failed to create transformation specification", AsyncExceptionCode.ClientInputError, e); + } finally { + if (result != null) { + try { + result.close(); + } catch (IOException e) { + Verbose.log("Failed to close result stream"); + } + } } } @@ -479,10 +500,8 @@ public BatchRequest createBatch(JobInfo job) throws AsyncApiException { } OutputStream out = transport.connect(endpoint, getHeaders(jobContentType)); return new BatchRequest(transport, out); - } catch (IOException e) { + } catch (ConnectionException | IOException e) { throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException x) { - throw new AsyncApiException("Failed to create batch", AsyncExceptionCode.ClientInputError, x); } } @@ -525,12 +544,9 @@ public TransformationSpecRequest createTransformationSpec(JobInfo job) throws As public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException { return getBatchInfoList(jobId, ContentType.XML); } - public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) throws AsyncApiException { - try { - String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/"; - URL url = new URL(endpoint); - InputStream stream = doHttpGet(url); + public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) throws AsyncApiException { + try (InputStream stream = getBatchInfoListStream(jobId)) { if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { return deserializeJsonToObject(stream, BatchInfoList.class); } else { @@ -540,11 +556,7 @@ public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) thr result.load(xin, typeMapper); return result; } - } catch (IOException e) { - throw new AsyncApiException("Failed to get batch info list ", AsyncExceptionCode.ClientInputError, e); - } catch (PullParserException e) { - throw new AsyncApiException("Failed to get batch info list ", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to get batch info list ", AsyncExceptionCode.ClientInputError, e); } } @@ -554,11 +566,7 @@ public BatchInfo getBatchInfo(String jobId, String batchId) throws AsyncApiExcep } public BatchInfo getBatchInfo(String jobId, String batchId, ContentType contentType) throws AsyncApiException { - try { - String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/" + batchId; - URL url = new URL(endpoint); - InputStream stream = doHttpGet(url); - + try (InputStream stream = getBatchInfoStream(jobId, batchId)) { if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { return deserializeJsonToObject(stream, BatchInfo.class); } else { @@ -568,11 +576,7 @@ public BatchInfo getBatchInfo(String jobId, String batchId, ContentType contentT result.load(xin, typeMapper); return result; } - } catch (IOException e) { - throw new AsyncApiException("Failed to parse batch info ", AsyncExceptionCode.ClientInputError, e); - } catch (PullParserException e) { - throw new AsyncApiException("Failed to parse batch info ", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to parse batch info ", AsyncExceptionCode.ClientInputError, e); } } @@ -580,10 +584,9 @@ public BatchInfo getBatchInfo(String jobId, String batchId, ContentType contentT public BatchResult getBatchResult(String jobId, String batchId) throws AsyncApiException { return getBatchResult(jobId, batchId, ContentType.XML); } - public BatchResult getBatchResult(String jobId, String batchId, ContentType contentType) throws AsyncApiException { - try { - InputStream stream = doHttpGet(buildBatchResultURL(jobId, batchId)); + public BatchResult getBatchResult(String jobId, String batchId, ContentType contentType) throws AsyncApiException { + try (InputStream stream = doHttpGet(buildBatchResultURL(jobId, batchId))){ if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { BatchResult batchResult = new BatchResult(); Result[] results = deserializeJsonToObject(stream, Result[].class); @@ -598,9 +601,7 @@ public BatchResult getBatchResult(String jobId, String batchId, ContentType cont } } catch (PullParserException e) { throw new AsyncApiException("Failed to parse result ", AsyncExceptionCode.ClientInputError, e); - } catch (IOException e) { - throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException e) { throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); } } @@ -615,6 +616,36 @@ public InputStream getBatchResultStream(String jobId, String batchId) throws Asy } } + public InputStream getBatchInfoListStream(String jobId) throws AsyncApiException { + try { + String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/"; + URL url = new URL(endpoint); + return doHttpGet(url); + } catch (IOException e) { + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } + } + + public InputStream getBatchInfoStream(String jobId, String batchId) throws AsyncApiException { + try { + String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/" + batchId; + URL url = new URL(endpoint); + return doHttpGet(url); + } catch (IOException e) { + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } + } + + public InputStream getJobStatusStream(String jobId) throws AsyncApiException { + try { + String endpoint = getRestEndpoint() + "job/" + jobId; + URL url = new URL(endpoint); + return doHttpGet(url); + } catch(IOException e) { + throw new AsyncApiException("Failed to get request ", AsyncExceptionCode.ClientInputError, e); + } + } + public URL buildBatchResultURL(String jobId, String batchId) throws AsyncApiException { try { return new URL(getRestEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result"); @@ -623,7 +654,8 @@ public URL buildBatchResultURL(String jobId, String batchId) throws AsyncApiExce AsyncExceptionCode.ClientInputError, e); } } - + + public InputStream getBatchRequestInputStream(String jobId, String batchId) throws AsyncApiException { try { String endpoint = getRestEndpoint() + "job/" + jobId + "/batch/" + batchId + "/request"; @@ -639,9 +671,7 @@ public QueryResultList getQueryResultList(String jobId, String batchId) throws A } public QueryResultList getQueryResultList(String jobId, String batchId, ContentType contentType) throws AsyncApiException { - InputStream stream = getBatchResultStream(jobId, batchId); - - try { + try (InputStream stream = getBatchResultStream(jobId, batchId)) { if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { String[] results = deserializeJsonToObject(stream, String[].class); QueryResultList list = new QueryResultList(); @@ -654,11 +684,7 @@ public QueryResultList getQueryResultList(String jobId, String batchId, ContentT result.load(xin, typeMapper); return result; } - } catch (ConnectionException e) { - throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, e); - } catch (PullParserException e) { - throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, e); - } catch (IOException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to parse query result list ", AsyncExceptionCode.ClientInputError, e); } } @@ -670,7 +696,7 @@ public InputStream getQueryResultStream(String jobId, String batchId, String res throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); } } - + public URL buildQueryResultURL(String jobId, String batchId, String resultId) throws AsyncApiException { try { return new URL(getRestEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result" + "/" + resultId); @@ -762,13 +788,7 @@ public JobInfo getJobStatus(String jobId) throws AsyncApiException { } public JobInfo getJobStatus(String jobId, ContentType contentType) throws AsyncApiException { - try { - String endpoint = getRestEndpoint(); - endpoint += "job/" + jobId; - URL url = new URL(endpoint); - - InputStream in = doHttpGet(url); - + try (InputStream in = getJobStatusStream(jobId)) { if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { return deserializeJsonToObject(in, JobInfo.class); } else { @@ -778,11 +798,7 @@ public JobInfo getJobStatus(String jobId, ContentType contentType) throws AsyncA result.load(xin, typeMapper); return result; } - } catch (PullParserException e) { - throw new AsyncApiException("Failed to get job status ", AsyncExceptionCode.ClientInputError, e); - } catch (IOException e) { - throw new AsyncApiException("Failed to get job status ", AsyncExceptionCode.ClientInputError, e); - } catch (ConnectionException e) { + } catch (ConnectionException | IOException | PullParserException e) { throw new AsyncApiException("Failed to get job status ", AsyncExceptionCode.ClientInputError, e); } } diff --git a/src/main/java/com/sforce/bulk/UpdateResultStream.java b/src/main/java/com/sforce/bulk/UpdateResultStream.java index 51e9cdf3..12e7f5d7 100644 --- a/src/main/java/com/sforce/bulk/UpdateResultStream.java +++ b/src/main/java/com/sforce/bulk/UpdateResultStream.java @@ -139,10 +139,8 @@ private void loadNextBatch() throws StreamException { waitForNextBatch(); while(handler.shouldContinue()) { - try { - InputStream resultStream = - bulkConnection.getBatchResultStream(job.getId(), batchList[batchIndex].getId()); - + try (InputStream resultStream = + bulkConnection.getBatchResultStream(job.getId(), batchList[batchIndex].getId())) { resultReader = new CSVReader(resultStream); resultReader.nextRecord(); //comsume header break; diff --git a/src/main/java/com/sforce/bulk/UpdateStream.java b/src/main/java/com/sforce/bulk/UpdateStream.java index 3b449ee6..010b1a86 100644 --- a/src/main/java/com/sforce/bulk/UpdateStream.java +++ b/src/main/java/com/sforce/bulk/UpdateStream.java @@ -155,6 +155,7 @@ private void createBatch() throws StreamException { BatchInfo batch = bulkConnection.createBatchFromStream(job, new ByteArrayInputStream(writer.getBuffer().toString().getBytes())); + handler.info("Batch created with ID: " + batch.getId()); writer = null; csvWriter = null;