diff --git a/kubernetes/k8.yaml b/kubernetes/k8.yaml index e0384f4..844c0c1 100644 --- a/kubernetes/k8.yaml +++ b/kubernetes/k8.yaml @@ -147,7 +147,7 @@ metadata: name: manager spec: containers: - - image: benblamey/hom-impl-2.manager:latest + - image: haoyuan9654/hom-impl-2.manager:latest # image is local-only atm. imagePullPolicy: Always name: manager diff --git a/manager/.idea/.gitignore b/manager/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/manager/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/manager/.idea/.name b/manager/.idea/.name new file mode 100644 index 0000000..63c9479 --- /dev/null +++ b/manager/.idea/.name @@ -0,0 +1 @@ +hom-impl-2.manager \ No newline at end of file diff --git a/manager/.idea/compiler.xml b/manager/.idea/compiler.xml new file mode 100644 index 0000000..659bf43 --- /dev/null +++ b/manager/.idea/compiler.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/manager/.idea/gradle.xml b/manager/.idea/gradle.xml new file mode 100644 index 0000000..d405dbe --- /dev/null +++ b/manager/.idea/gradle.xml @@ -0,0 +1,18 @@ + + + + + + + \ No newline at end of file diff --git a/manager/.idea/jarRepositories.xml b/manager/.idea/jarRepositories.xml new file mode 100644 index 0000000..fdc392f --- /dev/null +++ b/manager/.idea/jarRepositories.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/manager/.idea/misc.xml b/manager/.idea/misc.xml new file mode 100644 index 0000000..564e332 --- /dev/null +++ b/manager/.idea/misc.xml @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/manager/Dockerfile b/manager/Dockerfile index c1743e4..f7af69e 100644 --- a/manager/Dockerfile +++ b/manager/Dockerfile @@ -1,4 +1,6 @@ -FROM ubuntu:hirsute +FROM ubuntu:jammy +# clean and update sources +RUN apt-get -y update # Ubuntu 21.04 # with ubuntu:impish, get an error on apt update @@ -13,11 +15,11 @@ ENV TZ=Europe/Stockholm RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone # Always run update when changing package list, see https://docs.docker.com/develop/develop-images/dockerfile_best-practices/ -RUN apt update ; echo 'editthistoforcerun5' +#RUN apt update ; echo 'editthistoforcerun5' # install curl RUN apt install -y curl -RUN apt install -y openjdk-16-jre +RUN apt install -y openjdk-18-jre RUN java --version @@ -42,8 +44,8 @@ RUN curl -LO "https://dl.k8s.io/release/v1.22.3/bin/linux/amd64/kubectl" RUN install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl RUN kubectl version --client -COPY build/libs/hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar +COPY hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar # /usr/lib/jvm/jdk-17/bin/java -cp output.jar com.benblamey.hom.manager.ManagerMainTest #ENTRYPOINT ["java","-jar","output.jar"] -ENTRYPOINT ["/bin/bash"] \ No newline at end of file +ENTRYPOINT ["/bin/bash"] diff --git a/manager/build.gradle b/manager/build.gradle index 50334ae..5ec8766 100644 --- a/manager/build.gradle +++ b/manager/build.gradle @@ -30,6 +30,9 @@ dependencies { // https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1' + + // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml + implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: '2.13.3' } // Create a fat JAR to use with Docker, by overriding the default configuration for 'jar': @@ -70,4 +73,5 @@ pushDockerImage.dependsOn buildDockerImage test { useJUnitPlatform() -} \ No newline at end of file +} + diff --git a/manager/src/main/java/com/benblamey/hom/manager/CommandLineArguments.java b/manager/src/main/java/com/benblamey/hom/manager/CommandLineArguments.java index 4fcfca0..1ce48b7 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/CommandLineArguments.java +++ b/manager/src/main/java/com/benblamey/hom/manager/CommandLineArguments.java @@ -8,7 +8,7 @@ private static String getString(String argumentName) { private static String getString(String argumentName, String defaultValue) { // returns null if not specified. - String result = System.getProperty(argumentName); + String result = System.getenv(argumentName); if (result == null) { System.out.println(argumentName + " defaulted to " + defaultValue); return defaultValue; @@ -22,10 +22,11 @@ public static String getKafkaBootstrapServerConfig() { } public static String getDataPath() { - return getString("DATA_PATH"); + return getString("DATA_PATH", "/data/"); } public static String getMaxWorkerReplicas() { return getString("MAX_WORKER_REPLICAS", "3"); } + } diff --git a/manager/src/main/java/com/benblamey/hom/manager/InputTier.java b/manager/src/main/java/com/benblamey/hom/manager/InputTier.java index 276fdbb..d874bab 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/InputTier.java +++ b/manager/src/main/java/com/benblamey/hom/manager/InputTier.java @@ -8,7 +8,7 @@ import java.util.Map; // Placeholder for existin Kafka stream repr. the input source for the system. -public class InputTier extends Tier { +public class InputTier extends Tier { Logger logger = LoggerFactory.getLogger(InputTier.class); static final int tierId = 0; diff --git a/manager/src/main/java/com/benblamey/hom/manager/JexlDeploymentTier.java b/manager/src/main/java/com/benblamey/hom/manager/JexlDeploymentTier.java index d3feba8..7294621 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/JexlDeploymentTier.java +++ b/manager/src/main/java/com/benblamey/hom/manager/JexlDeploymentTier.java @@ -1,11 +1,13 @@ package com.benblamey.hom.manager; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +//@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "className") public class JexlDeploymentTier extends Tier { Logger logger = LoggerFactory.getLogger(JexlDeploymentTier.class); diff --git a/manager/src/main/java/com/benblamey/hom/manager/Manager.java b/manager/src/main/java/com/benblamey/hom/manager/Manager.java index bfe3901..eed2d90 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/Manager.java +++ b/manager/src/main/java/com/benblamey/hom/manager/Manager.java @@ -1,21 +1,33 @@ package com.benblamey.hom.manager; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; + + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.io.*; import java.util.*; + + public class Manager { Logger logger = LoggerFactory.getLogger(Manager.class); - private final List m_tiers = new ArrayList(); + TierSerialization ts = new TierSerialization(); + + private final List m_tiers; public List getTiers() { return m_tiers; } + + public Manager() { + m_tiers = ts.deserializeTiers(); + } + public void cleanup() throws IOException, InterruptedException { // Remove existing deployments. String deployments = Util.executeShellLogAndBlock(new String[]{"kubectl", "get", "deployments"}).stdOut; @@ -64,6 +76,15 @@ public void addJexlTier(String jexlExpression) throws IOException, InterruptedEx String inputTopic = m_tiers.isEmpty() ? "haste-input-data" : m_tiers.get(m_tiers.size() - 1).getOutputTopic(); int tierIndex = m_tiers.size(); Tier tier = new JexlDeploymentTier(jexlExpression, tierIndex, inputTopic); + + ts.serializeTiers(m_tiers); + + //re-serialize the current tiers +// ts.removeOldTiersXml(); +// for (int i = 0; i < m_tiers.size(); i++) { +// Tier tier1 = m_tiers.get(i); +// ts.serializeTier(tier1); +// } m_tiers.add(tier); } @@ -77,6 +98,15 @@ public void addNotebookTier(String filenameAndFunction) throws IOException, Inte int tierIndex = m_tiers.size(); Tier tier = new PyWorkerDeploymentTier(filenameAndFunction, tierIndex, inputTopic); m_tiers.add(tier); + //TierSerialization ts = new TierSerialization(); + //re-serialize the current tiers + ts.serializeTiers(m_tiers); + +// ts.removeOldTiersXml(); +// for (int i = 0; i < m_tiers.size(); i++) { +// Tier tier1 = m_tiers.get(i); +// ts.serializeTier(tier1); +// } } public void removeTier() throws IOException, InterruptedException { @@ -85,18 +115,29 @@ public void removeTier() throws IOException, InterruptedException { } Tier tier = m_tiers.get(m_tiers.size() - 1); - tier.remove(); // TODO - remove old kafka data? m_tiers.remove(tier); + ts.serializeTiers(m_tiers); + +// TierSerialization ts = new TierSerialization(); +// //re-serialize the current tiers +// ts.removeOldTiersXml(); +// for (int i = 0; i < m_tiers.size(); i++) { +// Tier tier1 = m_tiers.get(i); +// ts.serializeTier(tier1); +// } } public void addBaseTier(String topicID) { if (!getTiers().isEmpty()) { throw new RuntimeException("Can only add base tier if no existing tiers"); } +// TierSerialization ts = new TierSerialization(); Tier t = new InputTier(topicID); + ts.serializeTiers(m_tiers); m_tiers.add(t); } + } diff --git a/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java b/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java index ef589ba..56c0b04 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java +++ b/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java @@ -1,5 +1,6 @@ package com.benblamey.hom.manager; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -7,6 +8,7 @@ import org.slf4j.LoggerFactory; import spark.Response; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -32,6 +34,7 @@ public static void main(final String[] args) throws Exception { spark.Spark.get("/", (req, res) -> { logger.info("/"); + //manager.deserializeTiers(); return "The API is running."; }); diff --git a/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java b/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java index c93fb19..dc1199f 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java +++ b/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java @@ -17,15 +17,15 @@ public static List getFunctions(String directory) throws IOException, In "bash", "-ec", // Unescaped: - // grep --extended-regexp --only-matching "^\s*\"\s*def ([^(])+\(" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/" - "grep --extended-regexp --only-matching \"^\\s*\\\"def ([^(])+\\(\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\"" - }, new File(directory), null, true).stdOut; + // grep --extended-regexp --only-matching "^\s*\"\s*def\s*[_a-zA-Z]+\w*\([_a-zA-Z]+\w*\)" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/" + "grep --extended-regexp --only-matching \"^\\s*\\\"def\\s*[_a-zA-Z]+\\w*\\([_a-zA-Z]+\\w*\\)\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\"" + }, new File(directory), null, true).stdOut; return Arrays.stream(stdOut.split("\n")).toList(); } // For testing... public static void main(String[] args) throws IOException, InterruptedException { - getFunctions("/Users/benblamey/projects/github-me/hom-impl-2/persistentvolume"); + getFunctions("C:\\Users\\Savior_Hn\\Desktop\\HASTE-o-MATIC-main\\HASTE-o-MATIC-main\\persistentvolume"); } } diff --git a/manager/src/main/java/com/benblamey/hom/manager/Offsets.java b/manager/src/main/java/com/benblamey/hom/manager/Offsets.java index ba83f4c..1ed4025 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/Offsets.java +++ b/manager/src/main/java/com/benblamey/hom/manager/Offsets.java @@ -75,8 +75,8 @@ static List fetchOffsets() { // System.out.println(result); List offsetInfos = Arrays.stream(result.split("\\n")) .filter(line -> line.startsWith("app-hom-tier-")) - .map(line -> Arrays.stream(line.split("\s+")).toList()) - .map(parts -> new OffsetInfo(parts)) + .map(line -> Arrays.stream(line.split("\\s+")).toList()) + .map(parts -> new OffsetInfo((List) parts)) .toList(); return offsetInfos; } diff --git a/manager/src/main/java/com/benblamey/hom/manager/PyWorkerDeploymentTier.java b/manager/src/main/java/com/benblamey/hom/manager/PyWorkerDeploymentTier.java index 6540cad..3c4f83e 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/PyWorkerDeploymentTier.java +++ b/manager/src/main/java/com/benblamey/hom/manager/PyWorkerDeploymentTier.java @@ -101,10 +101,10 @@ private void createDeployment() throws IOException, InterruptedException { "python3", "-m", "nbconvert", - "/data/"+notebookFilenameWithExt, + CommandLineArguments.getDataPath()+notebookFilenameWithExt, "--output", scriptFileNameAndExtension, - "--output-dir=/data/" + PYWORKER_SCRIPT_DIR, + "--output-dir=" + CommandLineArguments.getDataPath() + PYWORKER_SCRIPT_DIR, //"--execute", // execute prior to export "--to", "python" @@ -121,7 +121,7 @@ private void createDeployment() throws IOException, InterruptedException { List args = new ArrayList(); args.addAll(Arrays.asList( - "sh -c ./data/nb_worker_context.sh", + "sh -c "+CommandLineArguments.getDataPath()+"nb_worker_context.sh", ";", "python3", "-u", @@ -131,7 +131,7 @@ private void createDeployment() throws IOException, InterruptedException { inputTopic, outputTopic, kafkaApplicationID, - "/data/" + PYWORKER_SCRIPT_DIR + "/" + scriptFileNameAndExtension, + CommandLineArguments.getDataPath() + PYWORKER_SCRIPT_DIR + "/" + scriptFileNameAndExtension, function)); String yaml = Util.getResourceAsStringFromUTF8("py_worker_tmpl.yaml") diff --git a/manager/src/main/java/com/benblamey/hom/manager/Tier.java b/manager/src/main/java/com/benblamey/hom/manager/Tier.java index f5d28c0..c83b81e 100644 --- a/manager/src/main/java/com/benblamey/hom/manager/Tier.java +++ b/manager/src/main/java/com/benblamey/hom/manager/Tier.java @@ -1,10 +1,27 @@ package com.benblamey.hom.manager; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; + import java.io.IOException; import java.util.Map; +//@JacksonXmlRootElement(localName = "tier") + +//@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "__class") +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.EXISTING_PROPERTY, + property = "type", + visible = true +) +@JsonSubTypes({ + @JsonSubTypes.Type(value = JexlDeploymentTier.class, name = "com.benblamey.hom.manager.JexlDeploymentTier") +}) public abstract class Tier { + String outputTopic; String uniqueTierId; String friendlyTierId; @@ -27,12 +44,12 @@ public abstract class Tier { } private void init() { - String sampleJsonlPath = "/data/sample-tier-" + friendlyTierId + ".jsonl"; + String sampleJsonlPath = CommandLineArguments.getDataPath()+"sample-tier-" + friendlyTierId + ".jsonl"; sampler = new TopicSampler(outputTopic, sampleJsonlPath); try { NotebooksFromTemplates.CreateAnalyzeTierNotebookFromTemplate(sampleJsonlPath, - "/data/analyze-tier-" + friendlyTierId + ".ipynb"); + CommandLineArguments.getDataPath()+"analyze-tier-" + friendlyTierId + ".ipynb"); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/manager/src/main/java/com/benblamey/hom/manager/TierSerialization.java b/manager/src/main/java/com/benblamey/hom/manager/TierSerialization.java new file mode 100644 index 0000000..9072906 --- /dev/null +++ b/manager/src/main/java/com/benblamey/hom/manager/TierSerialization.java @@ -0,0 +1,102 @@ +package com.benblamey.hom.manager; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator; +import com.fasterxml.jackson.databind.jsontype.PolymorphicTypeValidator; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + + +class TierSerialization { + + public static final String SERIALIZED_TIERS_FILENAME = "serializedTiers.json"; + + TierSerialization() { + m_xmlMapper = new ObjectMapper(); + PolymorphicTypeValidator ptv = BasicPolymorphicTypeValidator + .builder() + .allowIfBaseType("com.benblamey.hom") + .allowIfBaseType("java.lang") + .build(); + m_xmlMapper.activateDefaultTyping(ptv); // default to using DefaultTyping.OBJECT_AND_NON_CONCRETE + } + + private final ObjectMapper m_xmlMapper; + + public void serializeTiers(List tiers) { + try { + //String useDir = System.getProperty("user.dir"); + String xmlStr = m_xmlMapper.writerWithDefaultPrettyPrinter().writeValueAsString(tiers); + FileWriter fileWriter = new FileWriter(SERIALIZED_TIERS_FILENAME,false); + fileWriter.write(xmlStr); + //PrintWriter printWriter = new PrintWriter(fileWriter); + //printWriter.println(xmlStr); + fileWriter.close(); // also closes filewriter + + }catch (IOException e) { + throw new RuntimeException(e); + } + } + + public List deserializeTiers() { + if (!new File(SERIALIZED_TIERS_FILENAME).exists()) { + return new ArrayList<>(); + } + + try { +// //FileInputStream fis = new FileInputStream(); + FileReader fr = new FileReader(SERIALIZED_TIERS_FILENAME); +// //Scanner sc = new Scanner(fis); +//// while(sc.hasNextLine()) +// //XmlMapper xmlMapper = new XmlMapper(); +// +// ArrayList foo = new ArrayList(); + + //List tiers = (List) m_xmlMapper.readerFor(new TypeReference>() {}).readValue(fr); + List tiers = (List) m_xmlMapper.readValue(fr, ArrayList.class); + + //List tiers = m_xmlMapper.readValue(fr, foo.getClass()); + + fr.close(); + return tiers; + } catch (IOException e){ + throw new RuntimeException(e); + } + } + + +// public void serializeTier(Tier tier) { +// try { +// String xmlStr = null; +// XmlMapper xmlMapper = new XmlMapper(); +// //String useDir = System.getProperty("user.dir"); +// xmlStr = xmlMapper.writeValueAsString(tier); +// FileWriter fileWriter = new FileWriter("serializedTiers.xml",true); +// PrintWriter printWriter = new PrintWriter(fileWriter); +// printWriter.println(xmlStr); +// printWriter.close(); +// +// }catch (IOException e){ +// e.printStackTrace(); +// } +// } +// +// public void removeOldTiersXml() { +// try { +// File f = new File("serializedTiers.xml"); +// if (f.delete()){ +// System.out.println(f.getName()+" deleted!"); +// } +// else { +// System.out.println("Failed!"); +// } +// }catch (Exception e){ +// e.printStackTrace(); +// } +// } +} + + diff --git a/manager/src/test/java/com/benblamey/hom/manager/ManagerTest.java b/manager/src/test/java/com/benblamey/hom/manager/ManagerTest.java new file mode 100644 index 0000000..706722f --- /dev/null +++ b/manager/src/test/java/com/benblamey/hom/manager/ManagerTest.java @@ -0,0 +1,38 @@ +package com.benblamey.hom.manager; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; + +public class ManagerTest { + + // Note the env variables need to set to get the manager to work outside the container: + // DATA_PATH=/Users/ben/projects/2021.hom-impl-2/persistentvolume/;KAFKA_BOOTSTRAP_SERVER=dummy + + @Test + public void testTiers() throws IOException, InterruptedException { + + boolean deleted = new File(TierSerialization.SERIALIZED_TIERS_FILENAME).delete(); + + Manager m = new Manager(); + + // Manager is now empty. No file to deserialize. + + m.addDemoJexlTier(); + m.addDemoJexlTier(); + m.addDemoJexlTier(); + + // The XML file has been overwritten several times. + // Now we instantiate an additional manager, which will trigger deserialization. + + Manager m2 = new Manager(); + + // we get this error: + // java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `com.benblamey.hom.manager.JexlDeploymentTier` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator) + + // Now we can assert the number of tiers, their details, etc. + + } + +} diff --git a/manager/src/test/java/com/benblamey/hom/manager/TestUtil.java b/manager/src/test/java/com/benblamey/hom/manager/TestUtil.java index 5896c51..0243b1f 100644 --- a/manager/src/test/java/com/benblamey/hom/manager/TestUtil.java +++ b/manager/src/test/java/com/benblamey/hom/manager/TestUtil.java @@ -2,18 +2,18 @@ import com.benblamey.hom.manager.Util; -import org.junit.Test; +import junit.framework.TestCase; +import org.junit.jupiter.api.Test; -public class TestUtil { + + +public class TestUtil extends TestCase { @Test - static void testRandomAlphaString() { + public void testRandomAlphaString() { String str = Util.randomAlphaString(10); assert str.length() == 10; System.out.println(str); } - public static void main(String args[]) { - testRandomAlphaString(); - } }