Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1881] fix(client-spark): Make spark client uniifle class under org.apache.uniffle package to avoid conflict #1882

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:
docker exec rss-spark-master-1 /bin/bash -c "cat /example.scala | /opt/spark/bin/spark-shell \
--master spark://rss-spark-master-1:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \
--conf spark.shuffle.manager=org.apache.uniffle.spark.shuffle.RssShuffleManager \
--conf spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \
--conf spark.rss.storage.type=MEMORY_LOCALFILE \
--conf spark.task.maxFailures=4 \
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ Deploy Steps:
# Uniffle transmits serialized shuffle data over network, therefore a serializer that supports relocation of
# serialized object should be used.
spark.serializer org.apache.spark.serializer.KryoSerializer # this could also be in the spark-defaults.conf
spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
spark.shuffle.manager org.apache.uniffle.spark.shuffle.RssShuffleManager
spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
# Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
```
Expand All @@ -269,7 +269,7 @@ After apply the patch and rebuild spark, add following configuration in spark co
```
For spark3.5 or above just add one more configuration:
```
spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo
spark.shuffle.sort.io.plugin.class org.apache.uniffle.spark.shuffle.RssShuffleDataIo
```

### Deploy MapReduce Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkException;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.apache.spark.shuffle.RssStageInfo;
import org.apache.spark.shuffle.RssStageResubmitManager;
import org.apache.spark.shuffle.ShuffleHandleInfoManager;
import org.apache.spark.shuffle.ShuffleManager;
import org.apache.spark.shuffle.SparkVersionUtils;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,6 +73,17 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.shuffle.BlockIdManager;
import org.apache.uniffle.spark.shuffle.RssShuffleHandle;
import org.apache.uniffle.spark.shuffle.RssSparkConfig;
import org.apache.uniffle.spark.shuffle.RssSparkShuffleUtils;
import org.apache.uniffle.spark.shuffle.RssStageInfo;
import org.apache.uniffle.spark.shuffle.RssStageResubmitManager;
import org.apache.uniffle.spark.shuffle.ShuffleHandleInfoManager;
import org.apache.uniffle.spark.shuffle.SparkVersionUtils;
import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.uniffle.spark.shuffle.handle.StageAttemptShuffleHandleInfo;

import static org.apache.uniffle.common.config.RssClientConf.HADOOP_CONFIG_KEY_PREFIX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REMOTE_STORAGE_USE_LOCAL_CONF_ENABLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import java.util.Map;

import org.apache.spark.SparkException;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;

import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.shuffle.BlockIdManager;
import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo;

/**
* This is a proxy interface that mainly delegates the un-registration of shuffles to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.StreamObserver;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +40,8 @@
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.ShuffleManagerGrpc.ShuffleManagerImplBase;
import org.apache.uniffle.shuffle.BlockIdManager;
import org.apache.uniffle.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.uniffle.spark.shuffle.handle.StageAttemptShuffleHandleInfo;

public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
* limitations under the License.
*/

package org.apache.spark.shuffle;
package org.apache.uniffle.spark.shuffle;

import java.util.List;
import java.util.Map;

import org.apache.spark.ShuffleDependency;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.shuffle.ShuffleHandle;

import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo;

public class RssShuffleHandle<K, V, C> extends ShuffleHandle {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle;
package org.apache.uniffle.spark.shuffle;

import java.util.Set;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle;
package org.apache.uniffle.spark.shuffle;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -35,7 +35,8 @@
import org.apache.spark.SparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.shuffle.FetchFailedException;
import org.apache.spark.shuffle.ShuffleManager;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,9 +56,10 @@
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.spark.shuffle.handle.SimpleShuffleHandleInfo;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
import static org.apache.uniffle.common.util.Constants.DRIVER_HOST;
import static org.apache.uniffle.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;

public class RssSparkShuffleUtils {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle;
package org.apache.uniffle.spark.shuffle;

public class RssStageInfo {
private String stageAttemptIdAndNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle;
package org.apache.uniffle.spark.shuffle;

import java.util.Map;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.shuffle;
package org.apache.uniffle.spark.shuffle;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;

import org.apache.spark.shuffle.handle.ShuffleHandleInfo;

import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo;

public class ShuffleHandleInfoManager implements Closeable {
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle;
package org.apache.uniffle.spark.shuffle;

import org.apache.spark.package$;
import org.apache.spark.util.VersionUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.handle;
package org.apache.uniffle.spark.shuffle.handle;

import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.handle;
package org.apache.uniffle.spark.shuffle.handle;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.handle;
package org.apache.uniffle.spark.shuffle.handle;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.handle;
package org.apache.uniffle.spark.shuffle.handle;

import java.io.Serializable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.handle;
package org.apache.uniffle.spark.shuffle.handle;

import java.util.LinkedList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.reader;
package org.apache.uniffle.spark.shuffle.reader;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -25,7 +25,6 @@
import scala.collection.Iterator;

import org.apache.spark.shuffle.FetchFailedException;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +35,7 @@
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.spark.shuffle.RssSparkShuffleUtils;

public class RssFetchFailedIterator<K, C> extends AbstractIterator<Product2<K, C>> {
private static final Logger LOG = LoggerFactory.getLogger(RssFetchFailedIterator.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.reader;
package org.apache.uniffle.spark.shuffle.reader;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -33,7 +33,6 @@
import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,6 +41,7 @@
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.spark.shuffle.RssSparkConfig;

public class RssShuffleDataIterator<K, C> extends AbstractIterator<Product2<K, C>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;
package org.apache.uniffle.spark.shuffle.writer;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;
package org.apache.uniffle.spark.shuffle.writer;

import org.apache.uniffle.common.ShuffleBlockInfo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;
package org.apache.uniffle.spark.shuffle.writer;

import org.apache.uniffle.common.ShuffleBlockInfo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;
package org.apache.uniffle.spark.shuffle.writer;

import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.spark.shuffle.RssSparkConfig;

public class BufferManagerOptions {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;
package org.apache.uniffle.spark.shuffle.writer;

import java.io.Closeable;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;
package org.apache.uniffle.spark.shuffle.writer;

import java.util.List;
import java.util.Map;

import org.apache.spark.shuffle.handle.ShuffleHandleInfo;

import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.spark.shuffle.handle.ShuffleHandleInfo;

/** This class is to get the partition assignment for ShuffleWriter. */
public class TaskAttemptAssignment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;
package org.apache.uniffle.spark.shuffle.writer;

import java.io.ByteArrayOutputStream;

Expand Down
Loading
Loading