Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writing of model in the local file system #528

Merged
merged 1 commit into from
Nov 17, 2020
Merged

Conversation

sanmitra
Copy link
Contributor

Related issues
The model loading in a cluster was failing with error - op-model.json/part-00000 did not exist.

Additional context
In #516 we made the changes to save model locally and convert to zip before moving to remote path. So the path passed to
the below method saveImpl is of a local file system, hence when we are trying to save it as a HadoopFile, op-model.json/part-00000 is not being written out.

override protected def saveImpl(path: String): Unit = {
    JobGroupUtil.withJobGroup(OpStep.ModelIO) {
      sc.parallelize(Seq(toJsonString(path)), 1)
        .saveAsTextFile(OpWorkflowModelReadWriteShared.jsonPath(path), classOf[GzipCodec])
    }(this.sparkSession)
  }

@salesforce-cla
Copy link

Thanks for the contribution! It looks like @ijeri is an internal user so signing the CLA is not required. However, we need to confirm this.

@codecov
Copy link

codecov bot commented Nov 11, 2020

Codecov Report

Merging #528 (7501ab5) into master (bc507f2) will increase coverage by 4.26%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #528      +/-   ##
==========================================
+ Coverage   82.47%   86.74%   +4.26%     
==========================================
  Files         347      347              
  Lines       11952    11956       +4     
  Branches      377      384       +7     
==========================================
+ Hits         9858    10371     +513     
+ Misses       2094     1585     -509     
Impacted Files Coverage Δ
...cala/com/salesforce/op/OpWorkflowModelWriter.scala 100.00% <100.00%> (ø)
...sforce/op/stages/impl/feature/Transmogrifier.scala 98.05% <0.00%> (+0.83%) ⬆️
...esforce/op/features/types/FeatureTypeFactory.scala 99.13% <0.00%> (+0.86%) ⬆️
...la/com/salesforce/op/features/FeatureBuilder.scala 35.17% <0.00%> (+1.37%) ⬆️
...la/com/salesforce/op/stages/OpPipelineStages.scala 63.88% <0.00%> (+1.38%) ⬆️
...rce/op/stages/impl/preparators/SanityChecker.scala 90.57% <0.00%> (+1.63%) ⬆️
...scala/com/salesforce/op/testkit/RandomStream.scala 100.00% <0.00%> (+2.38%) ⬆️
...op/stages/DefaultOpPipelineStageReaderWriter.scala 69.23% <0.00%> (+2.56%) ⬆️
.../op/stages/impl/feature/PercentileCalibrator.scala 96.87% <0.00%> (+3.12%) ⬆️
.../scala/com/salesforce/op/dsl/RichTextFeature.scala 82.19% <0.00%> (+4.10%) ⬆️
... and 47 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bc507f2...f0db8d9. Read the comment docs.

val localPath = new Path(modelStagingDir)
val conf = new Configuration()
val localFileSystem = FileSystem.getLocal(conf)
if (shouldOverwrite) localFileSystem.delete(localPath, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to do the overwriting in an atomic transaction? Eg. so we can't get into a state where the old model got deleted, but the new model failed to save?


val modelJson = toJsonString(raw.toString)
val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString)
val out = localFileSystem.create(new Path(jsonPath, "part-00000"), shouldOverwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to need to hardcode the file name here to part-00000. Is there a way to let the hadoop filesystem determine the naming for us? Would we run into problems if the file is too large and hadoop wants to break it into many part-xxxxx files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought that model reader needs the part-0000 but looks like it does not https://github.com/salesforce/TransmogrifAI/blob/master/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala#L282. So, I will remove it and then it will be just op-model.json.

We are writing modelJson to a local file system, zipping it and then moving to remote s3 path. The bug here was that we were trying to write modelJson by doing
sc.parallelize(Seq(modelJson)).saveAsTextFile(jsonPath)

The above code will only work if the file is of the hadoop file system. So to write to local file system I am using standard file operations. So, there is no breaking it into many part-xxxx files as the modelJson is not big data.

val localPath = new Path(modelStagingDir)
val conf = new Configuration()
val localFileSystem = FileSystem.getLocal(conf)
if (shouldOverwrite) localFileSystem.delete(localPath, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line is no longer necessary now that we have val out = localFileSystem.create(new Path(jsonPath), shouldOverwrite) a bit below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should always do localFileSystem.delete(localPath, true) irrespective of shouldOverwrite. shouldOverwrite should be applicable to the remote path i.e the passed to saveImpl fn.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Right, they're independent


val modelJson = toJsonString(raw.toString)
val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString)
val out = localFileSystem.create(new Path(jsonPath), shouldOverwrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set shouldOverwrite to true? If destinationFileSystem.moveFromLocalFile(compressed, finalPath) below ever fails to delete after copying we'll need manual intervention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole saveImpl function is called inside save function which takes cares of if destinationFileSystem.moveFromLocalFile(compressed, finalPath) ever fails to delete.

def save(path: String): Unit = {
    new FileSystemOverwrite().handleOverwrite(path, shouldOverwrite, sc)
    saveImpl(path)
  }


private[ml] class FileSystemOverwrite extends Logging {

  def handleOverwrite(path: String, shouldOverwrite: Boolean, sc: SparkContext): Unit = {
    val hadoopConf = sc.hadoopConfiguration
    val outputPath = new Path(path)
    val fs = outputPath.getFileSystem(hadoopConf)
    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
    if (fs.exists(qualifiedOutputPath)) {
      if (shouldOverwrite) {
        logInfo(s"Path $path already exists. It will be overwritten.")
        // TODO: Revert back to the original content if save is not successful.
        fs.delete(qualifiedOutputPath, true)
      } else {
        throw new IOException(s"Path $path already exists. To overwrite it, " +
          s"please use write.overwrite().save(path) for Scala and use " +
          s"write().overwrite().save(path) for Java and Python.")
      }
    }
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to doing localFileSystem.create(new Path(jsonPath), true) -- which you already changed -- not the overwriting on the s3 path.

val out = localFileSystem.create(new Path(jsonPath), shouldOverwrite)
val os = new BufferedOutputStream(out)
os.write(modelJson.getBytes("UTF-8"))
os.close()
Copy link
Contributor

@nicodv nicodv Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure to close even in case of exceptions. Something along the lines of try...finally or https://www.scala-lang.org/api/current/scala/util/Using$.html (2.13 only, though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@gerashegalov
Copy link
Contributor

@ijeri can you give an example of the local path? Do you mean because the path is not absolute?

Saving RDD to a local path should work even if the local filesystem is not the default one:

scala> sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://127.0.0.1/")
scala> sc.parallelize(Seq(1,2,3)).saveAsTextFile("file:///tmp/test5")

@gerashegalov
Copy link
Contributor

gerashegalov commented Nov 12, 2020

+1 to stop gzipping op-model json since we now zip the whole bundle

@sanmitra
Copy link
Contributor Author

@gerashegalov the file path is a qualified one for ex- file:/a.../b..../op-model.json

@gerashegalov
Copy link
Contributor

makes sense: after rdd is distributed partitions are written out to the worker node local dirs.

Copy link
Contributor

@gerashegalov gerashegalov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some comments

localFileSystem.delete(localPath, true)
val raw = new Path(localPath, WorkflowFileReader.rawModel)

val modelJson = toJsonString(raw.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DRY: avoid repeated toString


val modelJson = toJsonString(raw.toString)
val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString)
val os = new BufferedOutputStream(localFileSystem.create(new Path(jsonPath)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffering is superfluous. Let use the FSDataOutputStream directly. It's actually backed by the BufferedOutputStream.

val jsonPath = OpWorkflowModelReadWriteShared.jsonPath(raw.toString)
val os = new BufferedOutputStream(localFileSystem.create(new Path(jsonPath)))
try {
os.write(modelJson.getBytes("UTF-8"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use a constant here StandardCharsets.UTF_8.toString

@@ -232,7 +232,12 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec with PassengerSparkFixtureTest
dirFile.isDirectory shouldBe true
// TODO: maybe do a thorough files inspection here
val files = FileUtils.listFiles(dirFile, null, true)
files.asScala.map(_.toString).exists(_.contains("_SUCCESS")) shouldBe true
if (outFile.getAbsolutePath.endsWith("/model")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe:

      val fileNames = files.asScala.map(_.getName)
      if (outFile.getAbsolutePath.endsWith("/model")) {
        fileNames should contain ("op-model.json")
      }
      else {
        fileNames should contain ("_SUCCESS")
      }

@sanmitra
Copy link
Contributor Author

@gerashegalov I have addressed your comments. Please take a look again.

Copy link
Contributor

@gerashegalov gerashegalov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@salesforce-cla
Copy link

Thanks for the contribution! Unfortunately we can't verify the commit author(s): Sanmitra Ijeri <s***@s***.com>. One possible solution is to add that email to your GitHub account. Alternatively you can change your commits to another email and force push the change. After getting your commits associated with your GitHub account, refresh the status of this Pull Request.

@sanmitra sanmitra merged commit 13ad9cd into master Nov 17, 2020
@sanmitra sanmitra deleted the san/modelLoadFix branch November 17, 2020 21:02
feifjiang pushed a commit to feifjiang/TransmogrifAI that referenced this pull request Dec 2, 2020
feifjiang pushed a commit to feifjiang/TransmogrifAI that referenced this pull request Dec 2, 2020
replace tuple with case class for misclassification per category

fixed ModelSelectorSummaryTest

1) Use binary search impl from Scala 2)increased scoring time in OpWorkflowModelLocalTest 3)renaming confusion matrix calculation params

Fix writing of model in the local file system (salesforce#528)

test user cla:missing

update RecordInisghtsLOCOTest Threshold

Fix writing of model in the local file system (salesforce#528)

test user cla:missing
feifjiang pushed a commit to feifjiang/TransmogrifAI that referenced this pull request Dec 4, 2020
replace tuple with case class for misclassification per category

fixed ModelSelectorSummaryTest

1) Use binary search impl from Scala 2)increased scoring time in OpWorkflowModelLocalTest 3)renaming confusion matrix calculation params

Fix writing of model in the local file system (salesforce#528)

test user cla:missing

update RecordInisghtsLOCOTest Threshold

Fix writing of model in the local file system (salesforce#528)

test user cla:missing

increased threshold to 1.0 to fix this flaky test

Fix writing of model in the local file system (salesforce#528)

(1)set the default value of topN classes 10 (2)added confMatrixClassIndices attribute
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants