Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubernetes/k8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ metadata:
name: manager
spec:
containers:
- image: benblamey/hom-impl-2.manager:latest
- image: haoyuan9654/hom-impl-2.manager:latest
# image is local-only atm.
imagePullPolicy: Always
name: manager
Expand Down
3 changes: 3 additions & 0 deletions manager/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions manager/.idea/.name

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions manager/.idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions manager/.idea/gradle.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions manager/.idea/jarRepositories.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions manager/.idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions manager/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
FROM ubuntu:hirsute
FROM ubuntu:jammy
# clean and update sources
RUN apt-get -y update
# Ubuntu 21.04

# with ubuntu:impish, get an error on apt update
Expand All @@ -13,11 +15,11 @@ ENV TZ=Europe/Stockholm
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone

# Always run update when changing package list, see https://docs.docker.com/develop/develop-images/dockerfile_best-practices/
RUN apt update ; echo 'editthistoforcerun5'
#RUN apt update ; echo 'editthistoforcerun5'

# install curl
RUN apt install -y curl
RUN apt install -y openjdk-16-jre
RUN apt install -y openjdk-18-jre

RUN java --version

Expand All @@ -42,8 +44,8 @@ RUN curl -LO "https://dl.k8s.io/release/v1.22.3/bin/linux/amd64/kubectl"
RUN install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
RUN kubectl version --client

COPY build/libs/hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar
COPY hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar

# /usr/lib/jvm/jdk-17/bin/java -cp output.jar com.benblamey.hom.manager.ManagerMainTest
#ENTRYPOINT ["java","-jar","output.jar"]
ENTRYPOINT ["/bin/bash"]
ENTRYPOINT ["/bin/bash"]
6 changes: 5 additions & 1 deletion manager/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ dependencies {

// https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple
implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'

// https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: '2.13.3'
}

// Create a fat JAR to use with Docker, by overriding the default configuration for 'jar':
Expand Down Expand Up @@ -70,4 +73,5 @@ pushDockerImage.dependsOn buildDockerImage

test {
useJUnitPlatform()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ private static String getString(String argumentName) {

private static String getString(String argumentName, String defaultValue) {
// returns null if not specified.
String result = System.getProperty(argumentName);
String result = System.getenv(argumentName);
if (result == null) {
System.out.println(argumentName + " defaulted to " + defaultValue);
return defaultValue;
Expand All @@ -22,10 +22,11 @@ public static String getKafkaBootstrapServerConfig() {
}

public static String getDataPath() {
return getString("DATA_PATH");
return getString("DATA_PATH", "/data/");
}

public static String getMaxWorkerReplicas() {
return getString("MAX_WORKER_REPLICAS", "3");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Map;

// Placeholder for existin Kafka stream repr. the input source for the system.
public class InputTier extends Tier {
public class InputTier extends Tier {

Logger logger = LoggerFactory.getLogger(InputTier.class);
static final int tierId = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

//@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "className")
public class JexlDeploymentTier extends Tier {

Logger logger = LoggerFactory.getLogger(JexlDeploymentTier.class);
Expand Down
47 changes: 44 additions & 3 deletions manager/src/main/java/com/benblamey/hom/manager/Manager.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.dataformat.xml.XmlMapper;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.*;
import java.util.*;



public class Manager {

Logger logger = LoggerFactory.getLogger(Manager.class);

private final List<Tier> m_tiers = new ArrayList<Tier>();
TierSerialization ts = new TierSerialization();

private final List<Tier> m_tiers;

public List<Tier> getTiers() {
return m_tiers;
}


public Manager() {
m_tiers = ts.deserializeTiers();
}

public void cleanup() throws IOException, InterruptedException {
// Remove existing deployments.
String deployments = Util.executeShellLogAndBlock(new String[]{"kubectl", "get", "deployments"}).stdOut;
Expand Down Expand Up @@ -64,6 +76,15 @@ public void addJexlTier(String jexlExpression) throws IOException, InterruptedEx
String inputTopic = m_tiers.isEmpty() ? "haste-input-data" : m_tiers.get(m_tiers.size() - 1).getOutputTopic();
int tierIndex = m_tiers.size();
Tier tier = new JexlDeploymentTier(jexlExpression, tierIndex, inputTopic);

ts.serializeTiers(m_tiers);

//re-serialize the current tiers
// ts.removeOldTiersXml();
// for (int i = 0; i < m_tiers.size(); i++) {
// Tier tier1 = m_tiers.get(i);
// ts.serializeTier(tier1);
// }
m_tiers.add(tier);
}

Expand All @@ -77,6 +98,15 @@ public void addNotebookTier(String filenameAndFunction) throws IOException, Inte
int tierIndex = m_tiers.size();
Tier tier = new PyWorkerDeploymentTier(filenameAndFunction, tierIndex, inputTopic);
m_tiers.add(tier);
//TierSerialization ts = new TierSerialization();
//re-serialize the current tiers
ts.serializeTiers(m_tiers);

// ts.removeOldTiersXml();
// for (int i = 0; i < m_tiers.size(); i++) {
// Tier tier1 = m_tiers.get(i);
// ts.serializeTier(tier1);
// }
}

public void removeTier() throws IOException, InterruptedException {
Expand All @@ -85,18 +115,29 @@ public void removeTier() throws IOException, InterruptedException {
}

Tier tier = m_tiers.get(m_tiers.size() - 1);

tier.remove();
// TODO - remove old kafka data?

m_tiers.remove(tier);
ts.serializeTiers(m_tiers);

// TierSerialization ts = new TierSerialization();
// //re-serialize the current tiers
// ts.removeOldTiersXml();
// for (int i = 0; i < m_tiers.size(); i++) {
// Tier tier1 = m_tiers.get(i);
// ts.serializeTier(tier1);
// }
}

public void addBaseTier(String topicID) {
if (!getTiers().isEmpty()) {
throw new RuntimeException("Can only add base tier if no existing tiers");
}
// TierSerialization ts = new TierSerialization();
Tier t = new InputTier(topicID);
ts.serializeTiers(m_tiers);
m_tiers.add(t);
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Response;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -32,6 +34,7 @@ public static void main(final String[] args) throws Exception {

spark.Spark.get("/", (req, res) -> {
logger.info("/");
//manager.deserializeTiers();
return "The API is running.";
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ public static List<String> getFunctions(String directory) throws IOException, In
"bash",
"-ec",
// Unescaped:
// grep --extended-regexp --only-matching "^\s*\"\s*def ([^(])+\(" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/"
"grep --extended-regexp --only-matching \"^\\s*\\\"def ([^(])+\\(\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\""
}, new File(directory), null, true).stdOut;
// grep --extended-regexp --only-matching "^\s*\"\s*def\s*[_a-zA-Z]+\w*\([_a-zA-Z]+\w*\)" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/"
"grep --extended-regexp --only-matching \"^\\s*\\\"def\\s*[_a-zA-Z]+\\w*\\([_a-zA-Z]+\\w*\\)\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\""

}, new File(directory), null, true).stdOut;
return Arrays.stream(stdOut.split("\n")).toList();
}

// For testing...
public static void main(String[] args) throws IOException, InterruptedException {
getFunctions("/Users/benblamey/projects/github-me/hom-impl-2/persistentvolume");
getFunctions("C:\\Users\\Savior_Hn\\Desktop\\HASTE-o-MATIC-main\\HASTE-o-MATIC-main\\persistentvolume");
}
}
4 changes: 2 additions & 2 deletions manager/src/main/java/com/benblamey/hom/manager/Offsets.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ static List<OffsetInfo> fetchOffsets() {
// System.out.println(result);
List<OffsetInfo> offsetInfos = Arrays.stream(result.split("\\n"))
.filter(line -> line.startsWith("app-hom-tier-"))
.map(line -> Arrays.stream(line.split("\s+")).toList())
.map(parts -> new OffsetInfo(parts))
.map(line -> Arrays.stream(line.split("\\s+")).toList())
.map(parts -> new OffsetInfo((List<String>) parts))
.toList();
return offsetInfos;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ private void createDeployment() throws IOException, InterruptedException {
"python3",
"-m",
"nbconvert",
"/data/"+notebookFilenameWithExt,
CommandLineArguments.getDataPath()+notebookFilenameWithExt,
"--output",
scriptFileNameAndExtension,
"--output-dir=/data/" + PYWORKER_SCRIPT_DIR,
"--output-dir=" + CommandLineArguments.getDataPath() + PYWORKER_SCRIPT_DIR,
//"--execute", // execute prior to export
"--to",
"python"
Expand All @@ -121,7 +121,7 @@ private void createDeployment() throws IOException, InterruptedException {
List<String> args = new ArrayList<String>();

args.addAll(Arrays.asList(
"sh -c ./data/nb_worker_context.sh",
"sh -c "+CommandLineArguments.getDataPath()+"nb_worker_context.sh",
";",
"python3",
"-u",
Expand All @@ -131,7 +131,7 @@ private void createDeployment() throws IOException, InterruptedException {
inputTopic,
outputTopic,
kafkaApplicationID,
"/data/" + PYWORKER_SCRIPT_DIR + "/" + scriptFileNameAndExtension,
CommandLineArguments.getDataPath() + PYWORKER_SCRIPT_DIR + "/" + scriptFileNameAndExtension,
function));

String yaml = Util.getResourceAsStringFromUTF8("py_worker_tmpl.yaml")
Expand Down
11 changes: 9 additions & 2 deletions manager/src/main/java/com/benblamey/hom/manager/Tier.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.benblamey.hom.manager;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;

import java.io.IOException;
import java.util.Map;

//@JacksonXmlRootElement(localName = "tier")

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "__class")
public abstract class Tier {


String outputTopic;
String uniqueTierId;
String friendlyTierId;
Expand All @@ -27,12 +34,12 @@ public abstract class Tier {
}

private void init() {
String sampleJsonlPath = "/data/sample-tier-" + friendlyTierId + ".jsonl";
String sampleJsonlPath = CommandLineArguments.getDataPath()+"sample-tier-" + friendlyTierId + ".jsonl";
sampler = new TopicSampler(outputTopic, sampleJsonlPath);

try {
NotebooksFromTemplates.CreateAnalyzeTierNotebookFromTemplate(sampleJsonlPath,
"/data/analyze-tier-" + friendlyTierId + ".ipynb");
CommandLineArguments.getDataPath()+"analyze-tier-" + friendlyTierId + ".ipynb");
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading