Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use caffeine in CachedStore and adapt ManagedInternalForm for state p… #3607

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions backend/src/main/java/com/bakdata/conquery/Conquery.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import jakarta.validation.Validator;

import ch.qos.logback.classic.Level;
import com.bakdata.conquery.commands.*;
import com.bakdata.conquery.commands.DistributedStandaloneCommand;
import com.bakdata.conquery.commands.ManagerNode;
import com.bakdata.conquery.commands.MigrateCommand;
import com.bakdata.conquery.commands.PreprocessorCommand;
import com.bakdata.conquery.commands.RecodeStoreCommand;
import com.bakdata.conquery.commands.ShardCommand;
import com.bakdata.conquery.io.jackson.Jackson;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
import com.bakdata.conquery.metrics.prometheus.PrometheusBundle;
Expand All @@ -12,6 +17,7 @@
import com.bakdata.conquery.mode.cluster.ClusterManagerProvider;
import com.bakdata.conquery.mode.local.LocalManagerProvider;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.configuration.JsonConfigurationFactory;
import io.dropwizard.configuration.SubstitutingSourceProvider;
Expand Down Expand Up @@ -39,6 +45,10 @@ public Conquery() {
this("Conquery");
}

public static void main(String... args) throws Exception {
new Conquery().run(args);
}

@Override
public void initialize(Bootstrap<ConqueryConfig> bootstrap) {
final ObjectMapper confMapper = bootstrap.getObjectMapper();
Expand All @@ -53,12 +63,21 @@ public void initialize(Bootstrap<ConqueryConfig> bootstrap) {
bootstrap.addCommand(new RecodeStoreCommand());
bootstrap.addCommand(new MigrateCommand());

((MutableInjectableValues) confMapper.getInjectableValues()).add(Validator.class, bootstrap.getValidatorFactory().getValidator());
MutableInjectableValues injectableValues = (MutableInjectableValues) confMapper.getInjectableValues();
injectableValues.add(Validator.class, bootstrap.getValidatorFactory().getValidator());
injectableValues.add(MetricRegistry.class, bootstrap.getMetricRegistry());

// do some setup in other classes after initialization but before running a
// command
bootstrap.addBundle(new ConfiguredBundle<>() {

@Override
public void initialize(Bootstrap<?> bootstrap) {
// Allow overriding of config from environment variables.
bootstrap.setConfigurationSourceProvider(new SubstitutingSourceProvider(
bootstrap.getConfigurationSourceProvider(), StringSubstitutor.createInterpolator()));
}

@Override
public void run(ConqueryConfig configuration, Environment environment) {
configuration.configureObjectMapper(environment.getObjectMapper());
Expand All @@ -71,13 +90,6 @@ protected void configure() {
}
});
}

@Override
public void initialize(Bootstrap<?> bootstrap) {
// Allow overriding of config from environment variables.
bootstrap.setConfigurationSourceProvider(new SubstitutingSourceProvider(
bootstrap.getConfigurationSourceProvider(), StringSubstitutor.createInterpolator()));
}
});

bootstrap.addBundle(new PrometheusBundle());
Expand All @@ -101,8 +113,4 @@ public void run(Manager manager) throws InterruptedException {
}
managerNode.run(manager);
}

public static void main(String... args) throws Exception {
new Conquery().run(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void cancel(Subject subject, ManagedExecution query) {

log.info("User[{}] cancelled Query[{}]", subject.getId(), query.getId());

executionManager.cancelQuery(query);
executionManager.cancelExecution(query);
}

public void patchQuery(Subject subject, ManagedExecution execution, MetaDataPatch patch) {
Expand Down Expand Up @@ -263,18 +263,18 @@ public void reexecute(Subject subject, ManagedExecution query) {
if (!query.getState().equals(ExecutionState.RUNNING)) {
final Namespace namespace = query.getNamespace();

namespace.getExecutionManager().execute(query, config);
namespace.getExecutionManager().execute(query);
}
}

public void deleteQuery(Subject subject, ManagedExecution execution) {
log.info("User[{}] deleted Query[{}]", subject.getId(), execution.getId());
public void deleteQuery(Subject subject, ManagedExecutionId execution) {
log.info("User[{}] deleted Query[{}]", subject.getId(), execution);

datasetRegistry.get(execution.getDataset())
.getExecutionManager() // Don't go over execution#getExecutionManager() as that's only set when query is initialized
.clearQueryResults(execution);

storage.removeExecution(execution.getId());
storage.removeExecution(execution);
}

public ExecutionState awaitDone(ManagedExecution query, int time, TimeUnit unit) {
Expand All @@ -285,7 +285,7 @@ public ExecutionState awaitDone(ManagedExecution query, int time, TimeUnit unit)
public FullExecutionStatus getQueryFullStatus(ManagedExecution query, Subject subject, UriBuilder url, Boolean allProviders) {
final Namespace namespace = datasetRegistry.get(query.getDataset());

query.initExecutable(config);
query.initExecutable();

final FullExecutionStatus status = query.buildStatusFull(subject, namespace);

Expand Down Expand Up @@ -332,7 +332,7 @@ public ExternalUploadResult uploadEntities(Subject subject, Dataset dataset, Ext
execution.setLabel(upload.getLabel());
}

execution.initExecutable(config);
execution.initExecutable();

return new ExternalUploadResult(execution.getId(), statistic.getResolved().size(), statistic.getUnresolvedId(), statistic.getUnreadableDate());
}
Expand Down Expand Up @@ -369,9 +369,12 @@ public FullExecutionStatus getSingleEntityExport(Subject subject, UriBuilder uri
throw new ConqueryError.ExecutionProcessingError();
}

// Workaround update our execution as the lastresultcount was set in the background
final EntityPreviewExecution executionFinished = (EntityPreviewExecution) execution.getId().resolve();
executionFinished.initExecutable();

final FullExecutionStatus status = execution.buildStatusFull(subject, namespace);
status.setResultUrls(getResultAssets(config.getResultProviders(), execution, uriBuilder, false));
status.setResultUrls(getResultAssets(config.getResultProviders(), executionFinished, uriBuilder, false));
return status;
}

Expand Down Expand Up @@ -428,21 +431,21 @@ public ManagedExecution postQuery(Dataset dataset, QueryDescription query, Subje

final Optional<ManagedExecution>
execution =
executionId.map(id -> tryReuse(query, id, namespace, config, executionManager, subject.getUser()));
executionId.map(id -> tryReuse(query, id, namespace, executionManager, subject.getUser()));

if (execution.isPresent()) {
return execution.get();
}
}

// Execute the query
return executionManager.runQuery(namespace, query, subject.getId(), config, system);
return executionManager.runQuery(namespace, query, subject.getId(), system);
}

/**
* Determine if the submitted query does reuse ONLY another query and restart that instead of creating another one.
*/
private ManagedExecution tryReuse(QueryDescription query, ManagedExecutionId executionId, Namespace namespace, ConqueryConfig config, ExecutionManager executionManager, User user) {
private ManagedExecution tryReuse(QueryDescription query, ManagedExecutionId executionId, Namespace namespace, ExecutionManager executionManager, User user) {

ManagedExecution execution = storage.getExecution(executionId);

Expand Down Expand Up @@ -485,7 +488,7 @@ private ManagedExecution tryReuse(QueryDescription query, ManagedExecutionId exe

log.trace("Re-executing Query {}", execution);

executionManager.execute(execution, config);
executionManager.execute(execution);

return execution;

Expand Down Expand Up @@ -571,7 +574,7 @@ public Stream<Map<String, String>> resolveEntities(Subject subject, List<FilterV
.filter(Predicate.not(Map::isEmpty));
}

public ResultStatistics getResultStatistics(SingleTableResult managedQuery) {
public <E extends ManagedExecution & SingleTableResult> ResultStatistics getResultStatistics(E managedQuery) {

final Locale locale = I18n.LOCALE.get();
final NumberFormat decimalFormat = NumberFormat.getNumberInstance(locale);
Expand All @@ -584,6 +587,8 @@ public ResultStatistics getResultStatistics(SingleTableResult managedQuery) {
new PrintSettings(true, locale, managedQuery.getNamespace(), config, null, null, decimalFormat, integerFormat);
final UniqueNamer uniqueNamer = new UniqueNamer(printSettings);

managedQuery.initExecutable();

final List<ResultInfo> resultInfos = managedQuery.getResultInfos();

final Optional<ResultInfo>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.bakdata.conquery.io.cps.CPSTypeIdResolver;
import com.bakdata.conquery.io.cps.SubTyped;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormScanner;
import com.bakdata.conquery.models.forms.frontendconfiguration.FormType;
Expand Down Expand Up @@ -133,8 +134,8 @@ public String getFormType() {
}

@Override
public ManagedExecution toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ExternalExecution(this, user, submittedDataset, storage, datasetRegistry);
public ManagedExecution toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry, ConqueryConfig config) {
return new ExternalExecution(this, user, submittedDataset, storage, datasetRegistry, config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.bakdata.conquery.internationalization.ExportFormC10n;
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.forms.managed.ManagedInternalForm;
import com.bakdata.conquery.models.forms.util.Alignment;
import com.bakdata.conquery.models.forms.util.Resolution;
Expand Down Expand Up @@ -136,8 +137,9 @@ public String getLocalizedTypeLabel() {
}

@Override
public ManagedInternalForm<ExportForm> toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry);
public ManagedInternalForm<ExportForm> toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry,
ConqueryConfig config) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry, config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.common.Range;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.forms.managed.ManagedInternalForm;
import com.bakdata.conquery.models.i18n.I18n;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
Expand Down Expand Up @@ -126,7 +127,8 @@ public String getLocalizedTypeLabel() {


@Override
public ManagedInternalForm<FullExportForm> toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry);
public ManagedInternalForm<FullExportForm> toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry,
ConqueryConfig config) {
return new ManagedInternalForm<>(this, user, submittedDataset, storage, datasetRegistry, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
import java.util.stream.Stream;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.execution.ExecutionState;
import com.bakdata.conquery.models.execution.ManagedExecution;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId;
import com.bakdata.conquery.models.identifiable.ids.specific.UserId;
import com.bakdata.conquery.models.query.ExecutionManager;
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.query.QueryPlanContext;
import com.bakdata.conquery.models.query.QueryResolveContext;
Expand All @@ -27,8 +25,6 @@ public abstract class Query implements QueryDescription {

public abstract QueryPlan<?> createQueryPlan(QueryPlanContext context);

public abstract void collectRequiredQueries(Set<ManagedExecutionId> requiredQueries);

@Override
public abstract void resolve(QueryResolveContext context);

Expand All @@ -38,12 +34,14 @@ public Set<ManagedExecutionId> collectRequiredQueries() {
return set;
}

public abstract void collectRequiredQueries(Set<ManagedExecutionId> requiredQueries);

@JsonIgnore
public abstract List<ResultInfo> getResultInfos();

@Override
public ManagedQuery toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry) {
return new ManagedQuery(this, user, submittedDataset, storage, datasetRegistry);
public ManagedQuery toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry, ConqueryConfig config) {
return new ManagedQuery(this, user, submittedDataset, storage, datasetRegistry, config);
}

/**
Expand All @@ -59,7 +57,6 @@ public CQElement getReusableComponents() {
*
* @param results
* @return the number of results in the result List.
* @see ManagedExecution#finish(ExecutionState, ExecutionManager) for how it's used.
*/
public long countResults(Stream<EntityResult> results) {
return results.map(EntityResult::listResultLines)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.models.auth.entities.Subject;
import com.bakdata.conquery.models.auth.permissions.Ability;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
Expand Down Expand Up @@ -45,13 +46,11 @@ public interface QueryDescription extends Visitable {
* @param user
* @param submittedDataset
* @param storage
* @param config
* @return
*/
ManagedExecution toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry);
ManagedExecution toManagedExecution(UserId user, DatasetId submittedDataset, MetaStorage storage, DatasetRegistry<?> datasetRegistry, ConqueryConfig config);


Set<ManagedExecutionId> collectRequiredQueries();

/**
* Initializes a submitted description using the provided context.
* All parameters that are set in this phase must be annotated with {@link com.bakdata.conquery.io.jackson.View.InternalCommunication}.
Expand All @@ -61,14 +60,14 @@ public interface QueryDescription extends Visitable {

/**
* Allows the implementation to add visitors that traverse the QueryTree.
* All visitors are concatenated so only a single traverse needs to be done.
* All visitors are concatenated so only a single traverse needs to be done.
* @param visitors The structure to which new visitors need to be added.
*/
default void addVisitors(@NonNull List<QueryVisitor> visitors) {
// Register visitors for permission checks
visitors.add(new QueryUtils.ExternalIdChecker());
}

/**
* Check implementation specific permissions. Is called after all visitors have been registered and executed.
*/
Expand Down Expand Up @@ -111,6 +110,8 @@ static void authorizeQuery(QueryDescription queryDescription, Subject subject, D
}
}

Set<ManagedExecutionId> collectRequiredQueries();

default RequiredEntities collectRequiredEntities(QueryExecutionContext context){
return new RequiredEntities(context.getBucketManager().getEntities());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import com.bakdata.conquery.resources.ResourcesProvider;
import com.bakdata.conquery.resources.admin.AdminServlet;
import com.bakdata.conquery.resources.admin.ShutdownTask;
import com.bakdata.conquery.tasks.LoadStorageTask;
import com.bakdata.conquery.tasks.PermissionCleanupTask;
import com.bakdata.conquery.tasks.QueryCleanupTask;
import com.bakdata.conquery.tasks.ReloadMetaStorageTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.lifecycle.Managed;
Expand Down Expand Up @@ -165,18 +165,20 @@ public void loadNamespaces() {

loaders.shutdown();
while (!loaders.awaitTermination(1, TimeUnit.MINUTES)) {
final int coundLoaded = registry.getDatasets().size();
log.debug("Waiting for Worker namespaces to load. {} are already finished. {} pending.", coundLoaded, namespaceStorages.size()
- coundLoaded);
final int countLoaded = registry.getNamespaces().size();
log.debug("Waiting for Worker namespaces to load. {} are already finished. {} pending.", countLoaded, namespaceStorages.size()
- countLoaded);
}
}

private void loadMetaStorage() {
log.info("Opening MetaStorage");
getMetaStorage().openStores(getInternalMapperFactory().createManagerPersistenceMapper(getDatasetRegistry(), getMetaStorage()), getEnvironment().metrics());
log.info("Loading MetaStorage");
getMetaStorage().loadData();
log.info("MetaStorage loaded {}", getMetaStorage());
getMetaStorage().openStores(getInternalMapperFactory().createManagerPersistenceMapper(getDatasetRegistry(), getMetaStorage()));
if (getConfig().getStorage().isLoadStoresOnStart()) {
log.info("Loading MetaStorage");
getMetaStorage().loadData();
log.trace("MetaStorage loaded {}", getMetaStorage());
}
}

private void registerTasks(Manager manager, Environment environment, ConqueryConfig config) {
Expand All @@ -189,7 +191,7 @@ private void registerTasks(Manager manager, Environment environment, ConqueryCon

environment.admin().addTask(new PermissionCleanupTask(getMetaStorage()));
manager.getAdminTasks().forEach(environment.admin()::addTask);
environment.admin().addTask(new ReloadMetaStorageTask(getMetaStorage()));
environment.admin().addTask(new LoadStorageTask(getName(), getMetaStorage(), getDatasetRegistry()));

final ShutdownTask shutdown = new ShutdownTask();
environment.admin().addTask(shutdown);
Expand Down
Loading
Loading