Skip to content

Commit

Permalink
Rethrowing Exception from CassandraIO's ReadFn
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle committed Mar 7, 2025
1 parent c9b55a3 commit f9da3f9
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
* (Python) Fixed occasional pipeline stuckness that was affecting Python 3.11 users ([#33966](https://github.com/apache/beam/issues/33966)).
* (Java) Fixed TIME field encodings for BigQuery Storage API writes on GenericRecords ([#34059](https://github.com/apache/beam/pull/34059)).
* (Java) Fixed a race condition in JdbcIO which could cause hangs trying to acquire a connection ([#34058](https://github.com/apache/beam/pull/34058)).
* (Java) Fixed cassandraIO ReadAll does not let a pipeline handle or retry exceptions ([#34191](https://github.com/apache/beam/pull/34191)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);

@ProcessElement
public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) {
public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) throws Exception {
try {
Session session = ConnectionManager.getSession(read);
Mapper<T> mapper = read.mapperFactoryFn().apply(session);
Expand Down Expand Up @@ -89,6 +89,7 @@ public void processElement(@Element Read<T> read, OutputReceiver<T> receiver) {
}
} catch (Exception ex) {
LOG.error("error", ex);
throw ex;
}
}

Expand All @@ -107,7 +108,7 @@ private static String getHighestSplitQuery(
String finalHighQuery =
(spec.query() == null)
? buildInitialQuery(spec, true) + highestClause
: spec.query() + getJoinerClause(spec.query().get()) + highestClause;
: spec.query().get() + getJoinerClause(spec.query().get()) + highestClause;
LOG.debug("CassandraIO generated a wrapAround query : {}", finalHighQuery);
return finalHighQuery;
}
Expand All @@ -117,7 +118,7 @@ private static String getLowestSplitQuery(Read<?> spec, String partitionKey, Big
String finalLowQuery =
(spec.query() == null)
? buildInitialQuery(spec, true) + lowestClause
: spec.query() + getJoinerClause(spec.query().get()) + lowestClause;
: spec.query().get() + getJoinerClause(spec.query().get()) + lowestClause;
LOG.debug("CassandraIO generated a wrapAround query : {}", finalLowQuery);
return finalLowQuery;
}
Expand Down

0 comments on commit f9da3f9

Please sign in to comment.