Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubernetes/k8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ metadata:
name: manager
spec:
containers:
- image: benblamey/hom-impl-2.manager:latest
- image: haoyuan9654/hom-impl-2.manager:latest
# image is local-only atm.
imagePullPolicy: Always
name: manager
Expand Down
3 changes: 3 additions & 0 deletions manager/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions manager/.idea/.name

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions manager/.idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions manager/.idea/gradle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions manager/.idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions manager/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions manager/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
FROM ubuntu:hirsute
FROM ubuntu:jammy
# clean and update sources
RUN apt-get -y update
# Ubuntu 21.04

# with ubuntu:impish, get an error on apt update
Expand All @@ -13,11 +15,11 @@ ENV TZ=Europe/Stockholm
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

# Always run update when changing package list, see https://docs.docker.com/develop/develop-images/dockerfile_best-practices/
RUN apt update ; echo 'editthistoforcerun5'
#RUN apt update ; echo 'editthistoforcerun5'

# install curl
RUN apt install -y curl
RUN apt install -y openjdk-16-jre
RUN apt install -y openjdk-18-jre

RUN java --version

Expand All @@ -42,8 +44,8 @@ RUN curl -LO "https://dl.k8s.io/release/v1.22.3/bin/linux/amd64/kubectl"
RUN install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
RUN kubectl version --client

COPY build/libs/hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar
COPY hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar

# /usr/lib/jvm/jdk-17/bin/java -cp output.jar com.benblamey.hom.manager.ManagerMainTest
#ENTRYPOINT ["java","-jar","output.jar"]
ENTRYPOINT ["/bin/bash"]
ENTRYPOINT ["/bin/bash"]
6 changes: 5 additions & 1 deletion manager/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ dependencies {

// https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple
implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'

// https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: '2.13.3'
}

// Create a fat JAR to use with Docker, by overriding the default configuration for 'jar':
Expand Down Expand Up @@ -70,4 +73,5 @@ pushDockerImage.dependsOn buildDockerImage

test {
useJUnitPlatform()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ private static String getString(String argumentName) {

private static String getString(String argumentName, String defaultValue) {
// returns null if not specified.
String result = System.getProperty(argumentName);
String result = System.getenv(argumentName);
if (result == null) {
System.out.println(argumentName + " defaulted to " + defaultValue);
return defaultValue;
Expand All @@ -22,10 +22,11 @@ public static String getKafkaBootstrapServerConfig() {
}

public static String getDataPath() {
return getString("DATA_PATH");
return getString("DATA_PATH", "/data/");
}

public static String getMaxWorkerReplicas() {
return getString("MAX_WORKER_REPLICAS", "3");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Map;

// Placeholder for existin Kafka stream repr. the input source for the system.
public class InputTier extends Tier {
public class InputTier extends Tier {

Logger logger = LoggerFactory.getLogger(InputTier.class);
static final int tierId = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

//@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "className")
public class JexlDeploymentTier extends Tier {

Logger logger = LoggerFactory.getLogger(JexlDeploymentTier.class);
Expand Down
47 changes: 44 additions & 3 deletions manager/src/main/java/com/benblamey/hom/manager/Manager.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.dataformat.xml.XmlMapper;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.*;
import java.util.*;



public class Manager {

Logger logger = LoggerFactory.getLogger(Manager.class);

private final List<Tier> m_tiers = new ArrayList<Tier>();
TierSerialization ts = new TierSerialization();

private final List<Tier> m_tiers;

public List<Tier> getTiers() {
return m_tiers;
}


public Manager() {
m_tiers = ts.deserializeTiers();
}

public void cleanup() throws IOException, InterruptedException {
// Remove existing deployments.
String deployments = Util.executeShellLogAndBlock(new String[]{"kubectl", "get", "deployments"}).stdOut;
Expand Down Expand Up @@ -64,6 +76,15 @@ public void addJexlTier(String jexlExpression) throws IOException, InterruptedEx
String inputTopic = m_tiers.isEmpty() ? "haste-input-data" : m_tiers.get(m_tiers.size() - 1).getOutputTopic();
int tierIndex = m_tiers.size();
Tier tier = new JexlDeploymentTier(jexlExpression, tierIndex, inputTopic);

ts.serializeTiers(m_tiers);

//re-serialize the current tiers
// ts.removeOldTiersXml();
// for (int i = 0; i < m_tiers.size(); i++) {
// Tier tier1 = m_tiers.get(i);
// ts.serializeTier(tier1);
// }
m_tiers.add(tier);
}

Expand All @@ -77,6 +98,15 @@ public void addNotebookTier(String filenameAndFunction) throws IOException, Inte
int tierIndex = m_tiers.size();
Tier tier = new PyWorkerDeploymentTier(filenameAndFunction, tierIndex, inputTopic);
m_tiers.add(tier);
//TierSerialization ts = new TierSerialization();
//re-serialize the current tiers
ts.serializeTiers(m_tiers);

// ts.removeOldTiersXml();
// for (int i = 0; i < m_tiers.size(); i++) {
// Tier tier1 = m_tiers.get(i);
// ts.serializeTier(tier1);
// }
}

public void removeTier() throws IOException, InterruptedException {
Expand All @@ -85,18 +115,29 @@ public void removeTier() throws IOException, InterruptedException {
}

Tier tier = m_tiers.get(m_tiers.size() - 1);

tier.remove();
// TODO - remove old kafka data?

m_tiers.remove(tier);
ts.serializeTiers(m_tiers);

// TierSerialization ts = new TierSerialization();
// //re-serialize the current tiers
// ts.removeOldTiersXml();
// for (int i = 0; i < m_tiers.size(); i++) {
// Tier tier1 = m_tiers.get(i);
// ts.serializeTier(tier1);
// }
}

public void addBaseTier(String topicID) {
if (!getTiers().isEmpty()) {
throw new RuntimeException("Can only add base tier if no existing tiers");
}
// TierSerialization ts = new TierSerialization();
Tier t = new InputTier(topicID);
ts.serializeTiers(m_tiers);
m_tiers.add(t);
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Response;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -32,6 +34,7 @@ public static void main(final String[] args) throws Exception {

spark.Spark.get("/", (req, res) -> {
logger.info("/");
//manager.deserializeTiers();
return "The API is running.";
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ public static List<String> getFunctions(String directory) throws IOException, In
"bash",
"-ec",
// Unescaped:
// grep --extended-regexp --only-matching "^\s*\"\s*def ([^(])+\(" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/"
"grep --extended-regexp --only-matching \"^\\s*\\\"def ([^(])+\\(\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\""
}, new File(directory), null, true).stdOut;
// grep --extended-regexp --only-matching "^\s*\"\s*def\s*[_a-zA-Z]+\w*\([_a-zA-Z]+\w*\)" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/"
"grep --extended-regexp --only-matching \"^\\s*\\\"def\\s*[_a-zA-Z]+\\w*\\([_a-zA-Z]+\\w*\\)\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\""

}, new File(directory), null, true).stdOut;
return Arrays.stream(stdOut.split("\n")).toList();
}

// For testing...
public static void main(String[] args) throws IOException, InterruptedException {
getFunctions("/Users/benblamey/projects/github-me/hom-impl-2/persistentvolume");
getFunctions("C:\\Users\\Savior_Hn\\Desktop\\HASTE-o-MATIC-main\\HASTE-o-MATIC-main\\persistentvolume");
}
}
4 changes: 2 additions & 2 deletions manager/src/main/java/com/benblamey/hom/manager/Offsets.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ static List<OffsetInfo> fetchOffsets() {
// System.out.println(result);
List<OffsetInfo> offsetInfos = Arrays.stream(result.split("\\n"))
.filter(line -> line.startsWith("app-hom-tier-"))
.map(line -> Arrays.stream(line.split("\s+")).toList())
.map(parts -> new OffsetInfo(parts))
.map(line -> Arrays.stream(line.split("\\s+")).toList())
.map(parts -> new OffsetInfo((List<String>) parts))
.toList();
return offsetInfos;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ private void createDeployment() throws IOException, InterruptedException {
"python3",
"-m",
"nbconvert",
"/data/"+notebookFilenameWithExt,
CommandLineArguments.getDataPath()+notebookFilenameWithExt,
"--output",
scriptFileNameAndExtension,
"--output-dir=/data/" + PYWORKER_SCRIPT_DIR,
"--output-dir=" + CommandLineArguments.getDataPath() + PYWORKER_SCRIPT_DIR,
//"--execute", // execute prior to export
"--to",
"python"
Expand All @@ -121,7 +121,7 @@ private void createDeployment() throws IOException, InterruptedException {
List<String> args = new ArrayList<String>();

args.addAll(Arrays.asList(
"sh -c ./data/nb_worker_context.sh",
"sh -c "+CommandLineArguments.getDataPath()+"nb_worker_context.sh",
";",
"python3",
"-u",
Expand All @@ -131,7 +131,7 @@ private void createDeployment() throws IOException, InterruptedException {
inputTopic,
outputTopic,
kafkaApplicationID,
"/data/" + PYWORKER_SCRIPT_DIR + "/" + scriptFileNameAndExtension,
CommandLineArguments.getDataPath() + PYWORKER_SCRIPT_DIR + "/" + scriptFileNameAndExtension,
function));

String yaml = Util.getResourceAsStringFromUTF8("py_worker_tmpl.yaml")
Expand Down
21 changes: 19 additions & 2 deletions manager/src/main/java/com/benblamey/hom/manager/Tier.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;

import java.io.IOException;
import java.util.Map;

//@JacksonXmlRootElement(localName = "tier")

//@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "__class")
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "type",
visible = true
)
@JsonSubTypes({
@JsonSubTypes.Type(value = JexlDeploymentTier.class, name = "com.benblamey.hom.manager.JexlDeploymentTier")
})
public abstract class Tier {


String outputTopic;
String uniqueTierId;
String friendlyTierId;
Expand All @@ -27,12 +44,12 @@ public abstract class Tier {
}

private void init() {
String sampleJsonlPath = "/data/sample-tier-" + friendlyTierId + ".jsonl";
String sampleJsonlPath = CommandLineArguments.getDataPath()+"sample-tier-" + friendlyTierId + ".jsonl";
sampler = new TopicSampler(outputTopic, sampleJsonlPath);

try {
NotebooksFromTemplates.CreateAnalyzeTierNotebookFromTemplate(sampleJsonlPath,
"/data/analyze-tier-" + friendlyTierId + ".ipynb");
CommandLineArguments.getDataPath()+"analyze-tier-" + friendlyTierId + ".ipynb");
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading