From aefcabae5ce40f533ecbef9c8727b6fce510e9c7 Mon Sep 17 00:00:00 2001 From: sszuev Date: Fri, 9 Aug 2024 22:56:48 +0300 Subject: [PATCH] ont-api: add BufferedHeadInputStream, change OntGraphUtils#readGraph to ignore OWLXML (workaround for https://github.com/apache/jena/issues/2620) --- .../owlcs/ontapi/BufferedHeadInputStream.java | 104 ++++++++++ .../github/owlcs/ontapi/OntGraphUtils.java | 24 ++- .../tests/BufferedHeadInputStreamTest.java | 184 ++++++++++++++++++ 3 files changed, 304 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/github/owlcs/ontapi/BufferedHeadInputStream.java create mode 100644 src/test/java/com/github/owlcs/ontapi/tests/BufferedHeadInputStreamTest.java diff --git a/src/main/java/com/github/owlcs/ontapi/BufferedHeadInputStream.java b/src/main/java/com/github/owlcs/ontapi/BufferedHeadInputStream.java new file mode 100644 index 00000000..d65a46d9 --- /dev/null +++ b/src/main/java/com/github/owlcs/ontapi/BufferedHeadInputStream.java @@ -0,0 +1,104 @@ +package com.github.owlcs.ontapi; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Objects; + +/** + * An InputStream with cached head. + */ +public class BufferedHeadInputStream extends InputStream { + private final InputStream source; + private final byte[] head; + private final int headEnd; + private int headPos = 0; + private boolean closed; + + public BufferedHeadInputStream(InputStream source, int cacheSize) { + this.source = Objects.requireNonNull(source); + this.head = new byte[cacheSize]; + try { + headEnd = source.read(head, 0, head.length); + } catch (IOException e) { + throw new UncheckedIOException("Failed to fill cache", e); + } + } + + public byte[] head() { + return head; + } + + private void checkOpen() throws IOException { + if (closed) { + throw new IOException("Stream is already closed"); + } + } + + @Override + public void close() throws IOException { + source.close(); + closed = true; + } + + @Override + public int read() throws IOException { + checkOpen(); + if (headPos < headEnd) { + return head[headPos++] & 0xFF; + } else { + return source.read(); + } + } + + @Override + public int read(@Nonnull byte[] b, int off, int len) throws IOException { + checkOpen(); + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + int bytesRead = 0; + if (headPos < headEnd) { + int bytesToReadFromCache = Math.min(len, headEnd - headPos); + System.arraycopy(head, headPos, b, off, bytesToReadFromCache); + headPos += bytesToReadFromCache; + bytesRead += bytesToReadFromCache; + off += bytesToReadFromCache; + len -= bytesToReadFromCache; + } + + if (len > 0) { + int bytesToReadFromSource = source.read(b, off, len); + if (bytesToReadFromSource > 0) { + bytesRead += bytesToReadFromSource; + } + } + + return bytesRead == 0 ? -1 : bytesRead; + } + + @Override + public int available() throws IOException { + checkOpen(); + return (headEnd - headPos) + source.available(); + } + + @Override + public long skip(long n) throws IOException { + checkOpen(); + long skipped = 0; + if (headPos < headEnd) { + int bytesToSkipInCache = (int) Math.min(n, headEnd - headPos); + headPos += bytesToSkipInCache; + skipped += bytesToSkipInCache; + n -= bytesToSkipInCache; + } + if (n > 0) { + skipped += source.skip(n); + } + return skipped; + } +} diff --git a/src/main/java/com/github/owlcs/ontapi/OntGraphUtils.java b/src/main/java/com/github/owlcs/ontapi/OntGraphUtils.java index da568479..84a8d929 100644 --- a/src/main/java/com/github/owlcs/ontapi/OntGraphUtils.java +++ b/src/main/java/com/github/owlcs/ontapi/OntGraphUtils.java @@ -63,7 +63,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Reader; -import java.io.StringReader; import java.io.StringWriter; import java.io.Writer; import java.net.URL; @@ -366,7 +365,7 @@ public static OntFormat readGraph(Graph graph, if (LOGGER.isDebugEnabled()) { LOGGER.debug("read {}, try <{}>", iri, lang); } - readGraph(graph, stream, iri.toString(), lang); + RDFDataMgr.read(graph, getInputStream(format, stream), iri.toString(), lang); return format; } catch (OWLOntologyInputSourceException | IOException e) { throw new OWLOntologyCreationException(source.getClass().getSimpleName() + @@ -384,14 +383,23 @@ public static OntFormat readGraph(Graph graph, throw error; } - protected static void readGraph(Graph graph, Closeable stream, String base, Lang lang) { - if (stream instanceof InputStream) { - RDFDataMgr.read(graph, (InputStream) stream, base, lang); - } else if (stream instanceof StringReader) { - RDFDataMgr.read(graph, (StringReader) stream, base, lang); + private static InputStream getInputStream(OntFormat format, Closeable stream) { + InputStream res; + if (stream instanceof Reader) { + res = new ReaderInputStream((Reader) stream, StandardCharsets.UTF_8); } else { - RDFDataMgr.read(graph, new ReaderInputStream((Reader) stream, StandardCharsets.UTF_8), base, lang); + res = (InputStream) stream; } + if (format == OntFormat.RDF_XML) { + // check if it is OWLXML + BufferedHeadInputStream is = new BufferedHeadInputStream(res, 8192); + String head = new String(is.head(), StandardCharsets.UTF_8); + if (head.contains(" rewindableStream.read(buffer, 11, 1)); + } + + @Test + void testReadWithLengthGreaterThanBufferLength() { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 10); + byte[] buffer = new byte[10]; + Assertions.assertThrows(IndexOutOfBoundsException.class, () -> rewindableStream.read(buffer, 0, 11)); + } + + @Test + void testReadWithOffsetPlusLengthGreaterThanBufferLength() { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 10); + byte[] buffer = new byte[10]; + Assertions.assertThrows(IndexOutOfBoundsException.class, () -> rewindableStream.read(buffer, 6, 5)); + } + + @Test + void testReadWhenEndOfStream() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 10); + byte[] buffer = new byte[testData.length()]; + int bytesRead = rewindableStream.read(buffer); + Assertions.assertEquals(testData.length(), bytesRead); + + bytesRead = rewindableStream.read(buffer); + Assertions.assertEquals(-1, bytesRead); + } + + @Test + void testReadWithLargeBuffer() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 5); + byte[] buffer = new byte[50]; + int bytesRead = rewindableStream.read(buffer); + Assertions.assertEquals(testData.length(), bytesRead); + Assertions.assertArrayEquals(testData.getBytes(StandardCharsets.UTF_8), Arrays.copyOfRange(buffer, 0, bytesRead)); + } + + @Test + void testReadWithSmallBuffer() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 20); + byte[] buffer = new byte[4]; + int bytesRead1 = rewindableStream.read(buffer); + Assertions.assertEquals(4, bytesRead1); + Assertions.assertArrayEquals("This".getBytes(StandardCharsets.UTF_8), buffer); + int bytesRead2 = rewindableStream.read(buffer); + Assertions.assertEquals(4, bytesRead2); + Assertions.assertArrayEquals(" is ".getBytes(StandardCharsets.UTF_8), buffer); + } + + @Test + void testReadWithExactCacheSize() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, testData.length()); + byte[] buffer = new byte[testData.length()]; + int bytesRead = rewindableStream.read(buffer); + Assertions.assertEquals(testData.length(), bytesRead); + Assertions.assertArrayEquals(testData.getBytes(StandardCharsets.UTF_8), buffer); + } + + @Test + void testReadWithCacheSmallerThanData() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 5); + byte[] buffer = new byte[10]; + int bytesRead1 = rewindableStream.read(buffer); + Assertions.assertEquals(10, bytesRead1); + Assertions.assertArrayEquals("This is a ".getBytes(StandardCharsets.UTF_8), buffer); + int bytesRead2 = rewindableStream.read(buffer); + Assertions.assertEquals(10, bytesRead2); + Assertions.assertArrayEquals("test input".getBytes(StandardCharsets.UTF_8), buffer); + } + + @Test + void testReadWithCacheLargerThanData() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 50); + byte[] buffer = new byte[testData.length()]; + int bytesRead = rewindableStream.read(buffer); + Assertions.assertEquals(testData.length(), bytesRead); + Assertions.assertArrayEquals(testData.getBytes(StandardCharsets.UTF_8), buffer); + } + + @Test + void testAvailable() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 10); + + int available = rewindableStream.available(); + Assertions.assertEquals(testData.length(), available); + + byte[] buffer = new byte[10]; + Assertions.assertEquals(10, rewindableStream.read(buffer)); + available = rewindableStream.available(); + Assertions.assertEquals(testData.length() - 10, available); + } + + @Test + void testSkip() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 10); + + long skipped = rewindableStream.skip(5); + Assertions.assertEquals(5, skipped); + + int nextByte = rewindableStream.read(); + Assertions.assertEquals('i', nextByte); + + skipped = rewindableStream.skip(testData.length()); + Assertions.assertEquals(testData.length() - 6, skipped); + + nextByte = rewindableStream.read(); + Assertions.assertEquals(-1, nextByte); // End of stream + } + + @Test + void testClose() throws IOException { + String testData = "This is a test input stream"; + InputStream originalStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + BufferedHeadInputStream rewindableStream = new BufferedHeadInputStream(originalStream, 10); + rewindableStream.close(); + Assertions.assertThrows(IOException.class, rewindableStream::read); + } +}